You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sd...@apache.org on 2022/02/04 09:52:56 UTC

[ignite-3] branch main updated: IGNITE-16260 User object serialization performance optimization

This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 6f171d6  IGNITE-16260 User object serialization performance optimization
6f171d6 is described below

commit 6f171d6a197bf26e7aad9f8dcf7a1103096f1190
Author: Roman Puchkovskiy <ro...@gmail.com>
AuthorDate: Tue Feb 1 15:36:35 2022 +0400

    IGNITE-16260 User object serialization performance optimization
    
    Co-authored-by: Semyon Danilov <sa...@yandex.ru>
---
 .../ignite/internal/util/FastTimestamps.java       |  65 ++
 .../ignite/internal/util/StringIntrospection.java  | 139 ++++
 .../ignite/internal/util/io/IgniteDataInput.java   | 309 ++++++++
 .../ignite/internal/util/io/IgniteDataOutput.java  | 138 ++++
 .../internal/util/io/IgniteUnsafeDataInput.java    | 800 +++++++++++++++++++++
 .../internal/util/io/IgniteUnsafeDataOutput.java   | 702 ++++++++++++++++++
 .../apache/ignite/internal/util/io}/VarInts.java   |  12 +-
 .../internal/util/StringIntrospectionTest.java     |  63 ++
 .../ignite/internal/util/io/IgniteTestIoUtils.java | 148 ++++
 .../IgniteUnsafeDataInputOutputByteOrderTest.java  | 240 +++++++
 .../util/io/IgniteUnsafeDataInputTest.java}        |  37 +-
 .../io/IgniteUnsafeDataOutputArraySizingTest.java  | 123 ++++
 .../ignite/internal/util/io}/VarIntsTest.java      |   2 +-
 modules/network/pom.xml                            |  29 +
 .../stream/DirectByteBufferStreamImplV1.java       |   2 +-
 .../network/serialization/BuiltInType.java         |   5 +-
 .../network/serialization/BuiltInTypeIds.java      |   9 +
 .../network/serialization/ClassDescriptor.java     |  43 +-
 .../serialization/ClassDescriptorRegistry.java     |   2 +
 .../internal/network/serialization/Classes.java    |   9 +-
 .../network/serialization/FieldDescriptor.java     |   4 +-
 .../PerSessionSerializationService.java            |  11 +-
 .../marshal/BuiltInContainerMarshallers.java       |  24 +-
 .../serialization/marshal/BuiltInMarshalling.java  | 209 +++---
 .../marshal/BuiltInNonContainerMarshallers.java    |  51 +-
 .../marshal/DefaultFieldsReaderWriter.java         |  11 +-
 .../marshal/DefaultUserObjectMarshaller.java       | 101 ++-
 .../marshal/ExternalizableMarshaller.java          |  10 +-
 .../marshal/{Bits.java => LittleEndianBits.java}   |  67 +-
 .../serialization/marshal/LocalDescriptors.java    |   5 +
 .../serialization/marshal/MarshalledObject.java    |  28 +-
 .../serialization/marshal/MarshallingContext.java  |  44 +-
 .../marshal/MarshallingValidations.java            |  61 +-
 .../serialization/marshal/ProtocolMarshalling.java |   1 +
 .../serialization/marshal/ProxyMarshaller.java     |  12 +-
 .../marshal/StructuredObjectMarshaller.java        |  63 +-
 .../serialization/marshal/TypedValueReader.java    |   8 +-
 .../serialization/marshal/TypedValueWriter.java    |   8 +-
 .../marshal/UnmarshallingContext.java              |  17 +-
 ...ValueWriter.java => UosIgniteOutputStream.java} |  34 +-
 .../marshal/UosObjectInputStream.java              |  25 +-
 .../marshal/UosObjectOutputStream.java             |  20 +-
 .../network/serialization/marshal/ValueReader.java |   8 +-
 .../network/serialization/marshal/ValueWriter.java |   8 +-
 .../ignite/internal/network/AllTypesMessage.java   |   3 +-
 .../internal/network/AllTypesMessageGenerator.java |  22 +-
 .../network/SerializationMicroBenchmark.java       | 307 ++++++++
 .../ignite/internal/network/UosProfilerTarget.java |  97 +++
 .../serialization/BuiltInDescriptorsTest.java      |   2 +
 .../network/serialization/ClassesTest.java         |  10 +
 .../network/serialization/MarshallableTest.java    |   4 +-
 .../DefaultUserObjectMarshallerCommonTest.java     |  74 ++
 ...rConcreteTypesKnownUpfrontOptimizationTest.java |  53 +-
 ...erObjectMarshallerWithArbitraryObjectsTest.java |  41 +-
 ...efaultUserObjectMarshallerWithBuiltinsTest.java |  80 ++-
 ...UserObjectMarshallerWithExternalizableTest.java |   9 +-
 ...shallerWithSerializableOverrideStreamsTest.java |  45 +-
 ...ltUserObjectMarshallerWithSerializableTest.java |   6 +-
 parent/pom.xml                                     |   8 +
 59 files changed, 3993 insertions(+), 475 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/FastTimestamps.java b/modules/core/src/main/java/org/apache/ignite/internal/util/FastTimestamps.java
new file mode 100644
index 0000000..af6f005
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/FastTimestamps.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+/**
+ * Provides access to fast (low-latency), but coarse-grained timestamps.
+ */
+public class FastTimestamps {
+    private static volatile long coarseCurrentTimeMillis = System.currentTimeMillis();
+
+    private static final long UPDATE_FREQUENCY_MS = 10;
+
+    static {
+        startUpdater();
+    }
+
+    private static void startUpdater() {
+        Thread updater = new Thread("FastTimestamps updater") {
+            @Override
+            public void run() {
+                while (true) {
+                    coarseCurrentTimeMillis = System.currentTimeMillis();
+                    try {
+                        Thread.sleep(UPDATE_FREQUENCY_MS);
+                    } catch (InterruptedException e) {
+                        break;
+                    }
+                }
+            }
+        };
+
+        updater.setDaemon(true);
+        updater.setPriority(10);
+        updater.start();
+    }
+
+    /**
+     * Returns number of milliseconds passed since Unix Epoch (1970-01-01) with a coarse resolution.
+     * The resolution is currently 10ms. This method works a lot faster (2 ns vs 11000 ns on a developer machine)
+     * than {@link System#currentTimeMillis()}.
+     *
+     * @return number of milliseconds passed since Unix Epoch (1970-01-01) with a coarse resolution
+     */
+    public static long coarseCurrentTimeMillis() {
+        return coarseCurrentTimeMillis;
+    }
+
+    private FastTimestamps() {
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/StringIntrospection.java b/modules/core/src/main/java/org/apache/ignite/internal/util/StringIntrospection.java
new file mode 100644
index 0000000..66df530
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/StringIntrospection.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import java.lang.reflect.Field;
+import java.nio.charset.StandardCharsets;
+import java.util.Optional;
+
+/**
+ * Utils for introspecting a String efficiently.
+ */
+public class StringIntrospection {
+    private static final boolean USE_UNSAFE_TO_GET_LATIN1_BYTES;
+    private static final long STRING_CODER_FIELD_OFFSET;
+    private static final long STRING_VALUE_FIELD_OFFSET;
+
+    private static final byte LATIN1 = 0;
+
+    private static final long NO_OFFSET = Long.MIN_VALUE;
+
+    static {
+        Optional<Boolean> maybeCompactStrings = compactStrings();
+        Optional<Long> maybeCoderFieldOffset = coderFieldOffset();
+
+        USE_UNSAFE_TO_GET_LATIN1_BYTES = maybeCompactStrings.isPresent() && maybeCoderFieldOffset.isPresent();
+        STRING_CODER_FIELD_OFFSET = maybeCoderFieldOffset.orElse(NO_OFFSET);
+
+        Optional<Long> maybeValueFieldOffset = byteValueFieldOffset();
+        STRING_VALUE_FIELD_OFFSET = maybeValueFieldOffset.orElse(NO_OFFSET);
+    }
+
+    private static Optional<Boolean> compactStrings() {
+        return compactStringsField()
+                .map(field -> {
+                    try {
+                        return (Boolean) field.get(null);
+                    } catch (IllegalAccessException e) {
+                        throw new IllegalStateException("Should not be thrown", e);
+                    }
+                });
+    }
+
+    private static Optional<Field> compactStringsField() {
+        return stringField("COMPACT_STRINGS")
+                .map(field -> {
+                    field.setAccessible(true);
+                    return field;
+                });
+    }
+
+    private static Optional<Field> stringField(String name) {
+        try {
+            return Optional.of(String.class.getDeclaredField(name));
+        } catch (NoSuchFieldException e) {
+            return Optional.empty();
+        }
+    }
+
+    private static Optional<Long> coderFieldOffset() {
+        return stringField("coder").map(GridUnsafe::objectFieldOffset);
+    }
+
+    private static Optional<Long> byteValueFieldOffset() {
+        return stringField("value")
+                .filter(field -> field.getType() == byte[].class)
+                .map(GridUnsafe::objectFieldOffset);
+    }
+
+    /**
+     * Returns {@code true} if the current String is represented as Latin1 internally AND we can get access to that
+     * representation fast.
+     *
+     * @param str   the string to check
+     * @return {@code true} if the current String is represented as Latin1 internally AND we can get access to that
+     *     representation fast
+     */
+    public static boolean supportsFastGetLatin1Bytes(String str) {
+        if (!USE_UNSAFE_TO_GET_LATIN1_BYTES) {
+            return false;
+        }
+        return GridUnsafe.getByteField(str, STRING_CODER_FIELD_OFFSET) == LATIN1;
+    }
+
+    /**
+     * Returns a byte array with ASCII representation of the string. This *ONLY* returns a meaningful result
+     * if each string character fits ASCII encoding (that is, its code is less than 128)!
+     * This may return the internal buffer of the string, so *THE ARRAY CONTENTS SHOULD NEVER BE MODIFIED!*.
+     * The method is 'fast' because in the current Hotspot JVMs (versions 9+) it avoids copying string bytes (as well
+     * as encoding/decoding), it just returns the internal String buffer.
+     *
+     * @param str string to work with
+     * @return byte represenation of an ASCII string
+     */
+    public static byte[] fastAsciiBytes(String str) {
+        if (STRING_VALUE_FIELD_OFFSET != NO_OFFSET) {
+            return (byte[]) GridUnsafe.getObjectField(str, STRING_VALUE_FIELD_OFFSET);
+        } else {
+            // Fallback: something is different, let's not fail here, just pay a performance penalty.
+            return str.getBytes(StandardCharsets.US_ASCII);
+        }
+    }
+
+    /**
+     * Returns a byte array with Latin1 representation of the string. This *ONLY* returns a meaningful result
+     * if each string character fits Latin1 encoding!
+     * This may return the internal buffer of the string, so *THE ARRAY CONTENTS SHOULD NEVER BE MODIFIED!*.
+     * The method is 'fast' because in the current Hotspot JVMs (versions 9+) it avoids copying string bytes (as well
+     * as encoding/decoding), it just returns the internal String buffer.
+     *
+     * @param str string to work with
+     * @return byte represenation of a Latin1 string
+     */
+    public static byte[] fastLatin1Bytes(String str) {
+        if (STRING_VALUE_FIELD_OFFSET != NO_OFFSET) {
+            return (byte[]) GridUnsafe.getObjectField(str, STRING_VALUE_FIELD_OFFSET);
+        } else {
+            // Fallback: something is different, let's not fail here, just pay performance penalty.
+            return str.getBytes(StandardCharsets.ISO_8859_1);
+        }
+    }
+
+    private StringIntrospection() {
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteDataInput.java b/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteDataInput.java
new file mode 100644
index 0000000..8306f66
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteDataInput.java
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.io;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Extended data input.
+ */
+public interface IgniteDataInput extends DataInput {
+    /**
+     * Sets the internal buffer.
+     *
+     * @param bytes Bytes.
+     * @param len Length.
+     */
+    void bytes(byte[] bytes, int len);
+
+    /**
+     * Sets the {@link InputStream} from which to pull data.
+     *
+     * @param in Underlying input stream.
+     * @throws IOException In case of error.
+     */
+    void inputStream(InputStream in) throws IOException;
+
+    /**
+     * Resets data input to the position remembered with {@link #mark(int)}.
+     *
+     * @throws IOException In case of error.
+     */
+    void reset() throws IOException;
+
+    /**
+     * Makes the data input ready for reuse.
+     *
+     * @throws IOException if something goes wrong
+     */
+    void cleanup() throws IOException;
+
+    /**
+     * Reads one byte and returns it or a negative value if the end of stream is reached.
+     *
+     * @return The next byte of data, or {@code -1} if the end of the stream is reached.
+     * @exception IOException In case of error.
+     */
+    int read() throws IOException;
+
+    /**
+     * Reads data to fill (probably, partly) the given array.
+     *
+     * @param b Buffer into which the data is read.
+     * @return Total number of bytes read into the buffer, or {@code -1} is there is no
+     *     more data because the end of the stream has been reached.
+     * @exception IOException In case of error.
+     */
+    int read(byte[] b) throws IOException;
+
+    /**
+     * Reads data to fill (probably, partly) the given array region.
+     *
+     * @param b Buffer into which the data is read.
+     * @param off Start offset.
+     * @param len Maximum number of bytes to read.
+     * @return Total number of bytes read into the buffer, or {@code -1} is there is no
+     *     more data because the end of the stream has been reached.
+     * @exception IOException In case of error.
+     */
+    int read(byte[] b, int off, int len) throws IOException;
+
+    /**
+     * Reads array of {@code byte}s.
+     *
+     * @return Array.
+     * @throws IOException In case of error.
+     */
+    byte[] readByteArray(int length) throws IOException;
+
+    /**
+     * Reads array of {@code short}s.
+     *
+     * @return Array.
+     * @throws IOException In case of error.
+     */
+    short[] readShortArray(int length) throws IOException;
+
+    /**
+     * Reads array of {@code int}s.
+     *
+     * @return Array.
+     * @throws IOException In case of error.
+     */
+    int[] readIntArray(int length) throws IOException;
+
+    /**
+     * Reads array of {@code long}s.
+     *
+     * @return Array.
+     * @throws IOException In case of error.
+     */
+    long[] readLongArray(int length) throws IOException;
+
+    /**
+     * Reads array of {@code float}s.
+     *
+     * @return Array.
+     * @throws IOException In case of error.
+     */
+    float[] readFloatArray(int length) throws IOException;
+
+    /**
+     * Reads array of {@code double}s.
+     *
+     * @return Array.
+     * @throws IOException In case of error.
+     */
+    double[] readDoubleArray(int length) throws IOException;
+
+    /**
+     * Reads array of {@code boolean}s.
+     *
+     * @return Array.
+     * @throws IOException In case of error.
+     */
+    boolean[] readBooleanArray(int length) throws IOException;
+
+    /**
+     * Reads array of {@code char}s.
+     *
+     * @return Array.
+     * @throws IOException In case of error.
+     */
+    char[] readCharArray(int length) throws IOException;
+
+    /**
+     * Reads the requested number of bytes from the input stream into the given
+     * byte array. This method blocks until {@code len} bytes of input data have
+     * been read, end of stream is detected, or an exception is thrown. The
+     * number of bytes actually read, possibly zero, is returned. This method
+     * does not close the input stream.
+     *
+     * <p>In the case where end of stream is reached before {@code len} bytes
+     * have been read, then the actual number of bytes read will be returned.
+     * When this stream reaches end of stream, further invocations of this
+     * method will return zero.
+     *
+     * <p>If {@code len} is zero, then no bytes are read and {@code 0} is
+     * returned; otherwise, there is an attempt to read up to {@code len} bytes.
+     *
+     * <p>The first byte read is stored into element {@code b[off]}, the next
+     * one in to {@code b[off+1]}, and so on. The number of bytes read is, at
+     * most, equal to {@code len}. Let <i>k</i> be the number of bytes actually
+     * read; these bytes will be stored in elements {@code b[off]} through
+     * {@code b[off+}<i>k</i>{@code -1]}, leaving elements {@code b[off+}<i>k</i>
+     * {@code ]} through {@code b[off+len-1]} unaffected.
+     *
+     * <p>The behavior for the case where the input stream is <i>asynchronously
+     * closed</i>, or the thread interrupted during the read, is highly input
+     * stream specific, and therefore not specified.
+     *
+     * <p>If an I/O error occurs reading from the input stream, then it may do
+     * so after some, but not all, bytes of {@code b} have been updated with
+     * data from the input stream. Consequently the input stream and {@code b}
+     * may be in an inconsistent state. It is strongly recommended that the
+     * stream be promptly closed if an I/O error occurs.
+     *
+     * @param  b the byte array into which the data is read
+     * @param  off the start offset in {@code b} at which the data is written
+     * @param  len the maximum number of bytes to read
+     * @return the actual number of bytes read into the buffer
+     * @throws IOException if an I/O error occurs
+     * @throws NullPointerException if {@code b} is {@code null}
+     * @throws IndexOutOfBoundsException If {@code off} is negative, {@code len}
+     *     is negative, or {@code len} is greater than {@code b.length - off}
+     */
+    int readFewBytes(byte[] b, int off, int len) throws IOException;
+
+    /**
+     * Reads all remaining bytes from the input stream. This method blocks until
+     * all remaining bytes have been read and end of stream is detected, or an
+     * exception is thrown. This method does not close the input stream.
+     *
+     * <p>When this stream reaches end of stream, further invocations of this
+     * method will return an empty byte array.
+     *
+     * <p>Note that this method is intended for simple cases where it is
+     * convenient to read all bytes into a byte array. It is not intended for
+     * reading input streams with large amounts of data.
+     *
+     * <p>The behavior for the case where the input stream is <i>asynchronously
+     * closed</i>, or the thread interrupted during the read, is highly input
+     * stream specific, and therefore not specified.
+     *
+     * <p>If an I/O error occurs reading from the input stream, then it may do
+     * so after some, but not all, bytes have been read. Consequently the input
+     * stream may not be at end of stream and may be in an inconsistent state.
+     * It is strongly recommended that the stream be promptly closed if an I/O
+     * error occurs.
+     *
+     * @return a byte array containing the bytes read from this input stream
+     * @throws IOException if an I/O error occurs
+     * @throws OutOfMemoryError if an array of the required size cannot be
+     *     allocated.
+     */
+    byte[] readAllBytes() throws IOException;
+
+    /**
+     * Marks the current position in this input stream. A subsequent call to
+     * the <code>reset</code> method repositions this stream at the last marked
+     * position so that subsequent reads re-read the same bytes.
+     *
+     * <p>The <code>readlimit</code> arguments tells this input stream to
+     * allow that many bytes to be read before the mark position gets
+     * invalidated.
+     *
+     * <p>The general contract of <code>mark</code> is that, if the method
+     * <code>markSupported</code> returns <code>true</code>, the stream somehow
+     * remembers all the bytes read after the call to <code>mark</code> and
+     * stands ready to supply those same bytes again if and whenever the method
+     * <code>reset</code> is called.  However, the stream is not required to
+     * remember any data at all if more than <code>readlimit</code> bytes are
+     * read from the stream before <code>reset</code> is called.
+     *
+     * <p>Marking a closed stream should not have any effect on the stream.
+     *
+     * <p>The <code>mark</code> method of <code>InputStream</code> does
+     * nothing.
+     *
+     * @param   readLimit   the maximum limit of bytes that can be read before
+     *                      the mark position becomes invalid.
+     * @see     java.io.InputStream#reset()
+     */
+    void mark(int readLimit);
+
+    /**
+     * Returns an estimate of the number of bytes that can be read (or skipped
+     * over) from this input stream without blocking, which may be 0, or 0 when
+     * end of stream is detected.  The read might be on the same thread or
+     * another thread.  A single read or skip of this many bytes will not block,
+     * but may read or skip fewer bytes.
+     *
+     * <p>Note that while some implementations of {@code InputStream} will
+     * return the total number of bytes in the stream, many will not.  It is
+     * never correct to use the return value of this method to allocate
+     * a buffer intended to hold all data in this stream.
+     *
+     * <p>A subclass's implementation of this method may choose to throw an
+     * {@link IOException} if this input stream has been closed by invoking the
+     * {@link #close()} method.
+     *
+     * <p>The {@code available} method of {@code InputStream} always returns
+     * {@code 0}.
+     *
+     * <p>This method should be overridden by subclasses.
+     *
+     * @return     an estimate of the number of bytes that can be read (or
+     *     skipped over) from this input stream without blocking or
+     *     {@code 0} when it reaches the end of the input stream.
+     * @exception  IOException if an I/O error occurs.
+     */
+    int available() throws IOException;
+
+    /**
+     * Materializes an object from next bytes (the count is known upfront). The bytes used for materialization are consumed.
+     * This method is useful to avoid an allocation when first reading bytes to a byte array and then converting them
+     * to an object.
+     *
+     * @param bytesCount   number of bytes to consume and use for materialization
+     * @param materializer materializer to turn the bytes to an object
+     * @param <T>          materialized object type
+     * @return the materialized object
+     * @throws IOException if an I/O error occurs
+     */
+    <T> T materializeFromNextBytes(int bytesCount, Materializer<? extends T> materializer) throws IOException;
+
+    /**
+     * Materializer used to turn a region of a byte array to an object.
+     *
+     * @param <T> the type of a materialized object
+     */
+    interface Materializer<T> {
+        /**
+         * Materializes an object from a region of the given byte array.
+         *
+         * @param buffer byte array
+         * @param offset offset to that array where the bytes representing the object to be materialized start
+         * @param length length (in bytes) of the region to use for materialization
+         * @return the materialized object
+         */
+        T materialize(byte[] buffer, int offset, int length);
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteDataOutput.java b/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteDataOutput.java
new file mode 100644
index 0000000..5012cde
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteDataOutput.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.io;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * Extended data output.
+ */
+public interface IgniteDataOutput extends DataOutput {
+    /**
+     * Sets the {@link OutputStream} to push data to.
+     *
+     * @param out Underlying stream.
+     */
+    void outputStream(OutputStream out);
+
+    /**
+     * Returns a copy of the internal array.
+     *
+     * @return Copy of internal array shrunk to offset.
+     */
+    byte[] array();
+
+    /**
+     * Returns the internal array.
+     *
+     * @return Internal array.
+     */
+    byte[] internalArray();
+
+    /**
+     * Returns the current offset in the internal array.
+     *
+     * @return Offset.
+     */
+    int offset();
+
+    /**
+     * Adjusts the offset to the internal array.
+     *
+     * @param off Offset.
+     */
+    void offset(int off);
+
+    /**
+     * Makes the data output ready for reuse.
+     */
+    void cleanup();
+
+    /**
+     * Writes array of {@code byte}s.
+     *
+     * @param arr Array.
+     * @throws IOException In case of error.
+     */
+    void writeByteArray(byte[] arr) throws IOException;
+
+    /**
+     * Writes array of {@code short}s.
+     *
+     * @param arr Array.
+     * @throws IOException In case of error.
+     */
+    void writeShortArray(short[] arr) throws IOException;
+
+    /**
+     * Writes array of {@code int}s.
+     *
+     * @param arr Array.
+     * @throws IOException In case of error.
+     */
+    void writeIntArray(int[] arr) throws IOException;
+
+    /**
+     * Writes array of {@code long}s.
+     *
+     * @param arr Array.
+     * @throws IOException In case of error.
+     */
+    void writeLongArray(long[] arr) throws IOException;
+
+    /**
+     * Writes array of {@code float}s.
+     *
+     * @param arr Array.
+     * @throws IOException In case of error.
+     */
+    void writeFloatArray(float[] arr) throws IOException;
+
+    /**
+     * Writes array of {@code double}s.
+     *
+     * @param arr Array.
+     * @throws IOException In case of error.
+     */
+    void writeDoubleArray(double[] arr) throws IOException;
+
+    /**
+     * Writes array of {@code boolean}s.
+     *
+     * @param arr Array.
+     * @throws IOException In case of error.
+     */
+    void writeBooleanArray(boolean[] arr) throws IOException;
+
+    /**
+     * Writes array of {@code char}s.
+     *
+     * @param arr Array.
+     * @throws IOException In case of error.
+     */
+    void writeCharArray(char[] arr) throws IOException;
+
+    /**
+     * Flushes the output. This flushes the interlying {@link OutputStream} (if exists).
+     *
+     * @throws IOException  if something went wrong
+     */
+    void flush() throws IOException;
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteUnsafeDataInput.java b/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteUnsafeDataInput.java
new file mode 100644
index 0000000..8c11bba
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteUnsafeDataInput.java
@@ -0,0 +1,800 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.io;
+
+import static org.apache.ignite.internal.util.GridUnsafe.BIG_ENDIAN;
+import static org.apache.ignite.internal.util.GridUnsafe.BYTE_ARR_OFF;
+import static org.apache.ignite.internal.util.GridUnsafe.CHAR_ARR_OFF;
+import static org.apache.ignite.internal.util.GridUnsafe.DOUBLE_ARR_OFF;
+import static org.apache.ignite.internal.util.GridUnsafe.FLOAT_ARR_OFF;
+import static org.apache.ignite.internal.util.GridUnsafe.INT_ARR_OFF;
+import static org.apache.ignite.internal.util.GridUnsafe.LONG_ARR_OFF;
+import static org.apache.ignite.internal.util.GridUnsafe.SHORT_ARR_OFF;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UTFDataFormatException;
+import java.util.Objects;
+import org.apache.ignite.internal.tostring.IgniteToStringBuilder;
+import org.apache.ignite.internal.tostring.IgniteToStringExclude;
+import org.apache.ignite.internal.util.FastTimestamps;
+import org.apache.ignite.internal.util.GridUnsafe;
+
+/**
+ * Data input based on {@code Unsafe} operations.
+ */
+public class IgniteUnsafeDataInput extends InputStream implements IgniteDataInput {
+    /** Maximum data block length. */
+    private static final int MAX_BLOCK_SIZE = 1024;
+
+    /** Length of char buffer (for reading strings). */
+    private static final int CHAR_BUF_SIZE = 256;
+
+    private static final long NO_AUTO_SHRINK = -1;
+
+    private final long shrinkCheckFrequencyMs;
+
+    /** Buffer for reading general/block data. */
+    @IgniteToStringExclude
+    private final byte[] utfBuf = new byte[MAX_BLOCK_SIZE];
+
+    /** Char buffer for fast string reads. */
+    @IgniteToStringExclude
+    private final char[] utfCharBuf = new char[CHAR_BUF_SIZE];
+
+    /** Current offset into buf. */
+    private int pos;
+
+    /** End offset of valid data in buf, or -1 if no more block data. */
+    private int end = -1;
+
+    /** Bytes. */
+    @IgniteToStringExclude
+    private byte[] buf;
+
+    /** Offset. */
+    private int off;
+
+    /** Max. */
+    private int max;
+
+    /** Underlying input stream. */
+    @IgniteToStringExclude
+    private InputStream in;
+
+    /** Buffer for reading from stream. */
+    @IgniteToStringExclude
+    private byte[] inBuf = new byte[1024];
+
+    /** Maximum message size. */
+    private int maxOff;
+
+    private int mark;
+
+    /** Last length check timestamp. */
+    private long lastAutoShrinkCheckTimestamp = FastTimestamps.coarseCurrentTimeMillis();
+
+    /**
+     * Creates a new input with auto-shrinking disabled and no internal buffer assigned.
+     */
+    public IgniteUnsafeDataInput() {
+        this(NO_AUTO_SHRINK);
+    }
+
+    /**
+     * Creates a new input with no internal buffer assigned.
+     *
+     * @param shrinkCheckFrequencyMs how often to check whether an underlying byte buffer needs to be shrunk
+     *                               (disables the auto-shrinking if it's -1)
+     */
+    public IgniteUnsafeDataInput(long shrinkCheckFrequencyMs) {
+        this.shrinkCheckFrequencyMs = shrinkCheckFrequencyMs;
+    }
+
+    /**
+     * Creates a new input with auto-shrinking disabled.
+     *
+     * @param bytes array to initially (before automatic resize) use as an internal buffer
+     */
+    public IgniteUnsafeDataInput(byte[] bytes) {
+        this(bytes, NO_AUTO_SHRINK);
+    }
+
+    /**
+     * Creates a new input.
+     *
+     * @param bytes array to initially (before automatic resize) use as an internal buffer
+     * @param shrinkCheckFrequencyMs how often to check whether an underlying byte buffer needs to be shrunk
+     *                               (disables the auto-shrinking if it's -1)
+     */
+    public IgniteUnsafeDataInput(byte[] bytes, long shrinkCheckFrequencyMs) {
+        this(shrinkCheckFrequencyMs);
+
+        bytes(bytes, bytes.length);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void bytes(byte[] bytes, int len) {
+        bytes(bytes, 0, len);
+    }
+
+    /**
+     * Sets the internal buffer.
+     *
+     * @param bytes Bytes.
+     * @param off Offset.
+     * @param len Length.
+     */
+    public void bytes(byte[] bytes, int off, int len) {
+        buf = bytes;
+
+        max = len;
+        this.off = off;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void inputStream(InputStream in) throws IOException {
+        this.in = in;
+
+        buf = inBuf;
+    }
+
+    /**
+     * Reads from stream to buffer. If stream is {@code null}, this method is no-op.
+     *
+     * @param size Number of bytes to read.
+     * @throws IOException In case of error.
+     */
+    private void fromStream(int size) throws IOException {
+        if (in == null) {
+            return;
+        }
+
+        maxOff = Math.max(maxOff, size);
+
+        // Increase size of buffer if needed.
+        if (size > inBuf.length) {
+            buf = inBuf = new byte[Math.max(inBuf.length << 1, size)]; // Grow.
+        } else if (isAutoShrinkEnabled()) {
+            long now = FastTimestamps.coarseCurrentTimeMillis();
+
+            if (now - lastAutoShrinkCheckTimestamp > shrinkCheckFrequencyMs) {
+                int halfSize = inBuf.length >> 1;
+
+                if (maxOff < halfSize) {
+                    byte[] newInBuf = new byte[halfSize]; // Shrink.
+
+                    System.arraycopy(inBuf, 0, newInBuf, 0, off);
+
+                    buf = inBuf = newInBuf;
+                }
+
+                maxOff = 0;
+                lastAutoShrinkCheckTimestamp = now;
+            }
+        }
+
+        off = 0;
+        max = 0;
+
+        while (max != size) {
+            int read = in.read(inBuf, max, size - max);
+
+            if (read == -1) {
+                throw new EOFException("End of stream reached: " + in);
+            }
+
+            max += read;
+        }
+    }
+
+    private boolean isAutoShrinkEnabled() {
+        return shrinkCheckFrequencyMs != NO_AUTO_SHRINK;
+    }
+
+    /**
+     * Advances the offset doing a check for whether we fell off the buffer edge.
+     *
+     * @param more Bytes to move forward.
+     * @return Old offset value.
+     * @throws IOException In case of error.
+     */
+    private int advanceOffset(int more) throws IOException {
+        int old = off;
+
+        off += more;
+
+        if (off > max) {
+            throw new EOFException("Attempt to read beyond the end of the stream "
+                    + "[pos=" + off + ", more=" + more + ", max=" + max + ']');
+        }
+
+        return old;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("NonSynchronizedMethodOverridesSynchronizedMethod")
+    @Override
+    public void reset() throws IOException {
+        if (in != null) {
+            in.reset();
+        } else {
+            off = mark;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void cleanup() throws IOException {
+        in = null;
+
+        off = 0;
+        max = 0;
+        mark = 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public byte[] readByteArray(int arrSize) throws IOException {
+        fromStream(arrSize);
+
+        byte[] arr = new byte[arrSize];
+
+        System.arraycopy(buf, advanceOffset(arrSize), arr, 0, arrSize);
+
+        return arr;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public short[] readShortArray(int arrSize) throws IOException {
+        int bytesToCp = arrSize << 1;
+
+        fromStream(bytesToCp);
+
+        short[] arr = new short[arrSize];
+
+        long off = BYTE_ARR_OFF + advanceOffset(bytesToCp);
+
+        if (BIG_ENDIAN) {
+            for (int i = 0; i < arr.length; i++) {
+                arr[i] = GridUnsafe.getShortLittleEndian(buf, off);
+
+                off += 2;
+            }
+        } else {
+            GridUnsafe.copyMemory(buf, off, arr, SHORT_ARR_OFF, bytesToCp);
+        }
+
+        return arr;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int[] readIntArray(int arrSize) throws IOException {
+        int bytesToCp = arrSize << 2;
+
+        fromStream(bytesToCp);
+
+        int[] arr = new int[arrSize];
+
+        long off = BYTE_ARR_OFF + advanceOffset(bytesToCp);
+
+        if (BIG_ENDIAN) {
+            for (int i = 0; i < arr.length; i++) {
+                arr[i] = GridUnsafe.getIntLittleEndian(buf, off);
+
+                off += 4;
+            }
+        } else {
+            GridUnsafe.copyMemory(buf, off, arr, INT_ARR_OFF, bytesToCp);
+        }
+
+        return arr;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public double[] readDoubleArray(int arrSize) throws IOException {
+        int bytesToCp = arrSize << 3;
+
+        fromStream(bytesToCp);
+
+        double[] arr = new double[arrSize];
+
+        long off = BYTE_ARR_OFF + advanceOffset(bytesToCp);
+
+        if (BIG_ENDIAN) {
+            for (int i = 0; i < arr.length; i++) {
+                arr[i] = GridUnsafe.getDoubleLittleEndian(buf, off);
+
+                off += 8;
+            }
+        } else {
+            GridUnsafe.copyMemory(buf, off, arr, DOUBLE_ARR_OFF, bytesToCp);
+        }
+
+        return arr;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean[] readBooleanArray(int arrSize) throws IOException {
+        boolean[] vals = new boolean[arrSize];
+
+        for (int i = 0; i < arrSize; i++) {
+            vals[i] = readBoolean();
+        }
+
+        return vals;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public char[] readCharArray(int arrSize) throws IOException {
+        int bytesToCp = arrSize << 1;
+
+        fromStream(bytesToCp);
+
+        char[] arr = new char[arrSize];
+
+        long off = BYTE_ARR_OFF + advanceOffset(bytesToCp);
+
+        if (BIG_ENDIAN) {
+            for (int i = 0; i < arr.length; i++) {
+                arr[i] = GridUnsafe.getCharLittleEndian(buf, off);
+
+                off += 2;
+            }
+        } else {
+            GridUnsafe.copyMemory(buf, off, arr, CHAR_ARR_OFF, bytesToCp);
+        }
+
+        return arr;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long[] readLongArray(int arrSize) throws IOException {
+        int bytesToCp = arrSize << 3;
+
+        fromStream(bytesToCp);
+
+        long[] arr = new long[arrSize];
+
+        long off = BYTE_ARR_OFF + advanceOffset(bytesToCp);
+
+        if (BIG_ENDIAN) {
+            for (int i = 0; i < arr.length; i++) {
+                arr[i] = GridUnsafe.getLongLittleEndian(buf, off);
+
+                off += 8;
+            }
+        } else {
+            GridUnsafe.copyMemory(buf, off, arr, LONG_ARR_OFF, bytesToCp);
+        }
+
+        return arr;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public float[] readFloatArray(int arrSize) throws IOException {
+        int bytesToCp = arrSize << 2;
+
+        fromStream(bytesToCp);
+
+        float[] arr = new float[arrSize];
+
+        long off = BYTE_ARR_OFF + advanceOffset(bytesToCp);
+
+        if (BIG_ENDIAN) {
+            for (int i = 0; i < arr.length; i++) {
+                arr[i] = GridUnsafe.getFloatLittleEndian(buf, off);
+
+                off += 4;
+            }
+        } else {
+            GridUnsafe.copyMemory(buf, off, arr, FLOAT_ARR_OFF, bytesToCp);
+        }
+
+        return arr;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int readFewBytes(byte[] b, int off, int len) throws IOException {
+        Objects.checkFromIndexSize(off, len, b.length);
+
+        int n = 0;
+        while (n < len) {
+            int count = read(b, off + n, len - n);
+            if (count < 0) {
+                break;
+            }
+            n += count;
+        }
+        return n;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void readFully(byte[] b) throws IOException {
+        int len = b.length;
+
+        fromStream(len);
+
+        System.arraycopy(buf, advanceOffset(len), b, 0, len);
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("NullableProblems")
+    @Override
+    public void readFully(byte[] b, int off, int len) throws IOException {
+        fromStream(len);
+
+        System.arraycopy(buf, advanceOffset(len), b, off, len);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int skipBytes(int n) {
+        if (off + n > max) {
+            n = max - off;
+        }
+
+        off += n;
+
+        return n;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean readBoolean() throws IOException {
+        fromStream(1);
+
+        return GridUnsafe.getBoolean(buf, BYTE_ARR_OFF + advanceOffset(1));
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public byte readByte() throws IOException {
+        fromStream(1);
+
+        return GridUnsafe.getByte(buf, BYTE_ARR_OFF + advanceOffset(1));
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int readUnsignedByte() throws IOException {
+        return readByte() & 0xff;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public short readShort() throws IOException {
+        fromStream(2);
+
+        long off = BYTE_ARR_OFF + advanceOffset(2);
+
+        return BIG_ENDIAN ? GridUnsafe.getShortLittleEndian(buf, off) : GridUnsafe.getShort(buf, off);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int readUnsignedShort() throws IOException {
+        return readShort() & 0xffff;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public char readChar() throws IOException {
+        fromStream(2);
+
+        long off = BYTE_ARR_OFF + this.off;
+
+        char v = BIG_ENDIAN ? GridUnsafe.getCharLittleEndian(buf, off) : GridUnsafe.getChar(buf, off);
+
+        advanceOffset(2);
+
+        return v;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int readInt() throws IOException {
+        fromStream(4);
+
+        long off = BYTE_ARR_OFF + advanceOffset(4);
+
+        return BIG_ENDIAN ? GridUnsafe.getIntLittleEndian(buf, off) : GridUnsafe.getInt(buf, off);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long readLong() throws IOException {
+        fromStream(8);
+
+        long off = BYTE_ARR_OFF + advanceOffset(8);
+
+        return BIG_ENDIAN ? GridUnsafe.getLongLittleEndian(buf, off) : GridUnsafe.getLong(buf, off);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public float readFloat() throws IOException {
+        int v = readInt();
+
+        return Float.intBitsToFloat(v);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public double readDouble() throws IOException {
+        long v = readLong();
+
+        return Double.longBitsToDouble(v);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int read() throws IOException {
+        try {
+            return readUnsignedByte();
+        } catch (EOFException ignored) {
+            return -1;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("NullableProblems")
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        if (b == null) {
+            throw new NullPointerException();
+        }
+
+        if (off < 0 || len < 0 || len > b.length - off) {
+            throw new IndexOutOfBoundsException();
+        }
+
+        if (len == 0) {
+            return 0;
+        }
+
+        if (in != null) {
+            return in.read(b, off, len);
+        } else {
+            int toRead = Math.min(len, max - this.off);
+            if (toRead <= 0) {
+                return -1;
+            }
+
+            System.arraycopy(buf, advanceOffset(toRead), b, off, toRead);
+
+            return toRead;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public String readLine() throws IOException {
+        StringBuilder sb = new StringBuilder();
+
+        int b;
+
+        while ((b = read()) >= 0) {
+            char c = (char) b;
+
+            switch (c) {
+                case '\n':
+                    return sb.toString();
+
+                case '\r':
+                    b = read();
+
+                    if (b < 0 || b == '\n') {
+                        return sb.toString();
+                    } else {
+                        sb.append((char) b);
+                    }
+
+                    break;
+
+                default:
+                    sb.append(c);
+            }
+        }
+
+        return sb.toString();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public String readUTF() throws IOException {
+        return readUtfBody(VarInts.readUnsignedInt(this));
+    }
+
+    /**
+     * Reads in the "body" (i.e., the UTF representation minus the 2-byte
+     * or 8-byte length header) of a UTF encoding, which occupies the next
+     * utfLen bytes.
+     *
+     * @param utfLen UTF encoding length.
+     * @return String.
+     * @throws IOException In case of error.
+     */
+    private String readUtfBody(long utfLen) throws IOException {
+        StringBuilder sbuf = new StringBuilder();
+
+        end = pos = 0;
+
+        while (utfLen > 0) {
+            int avail = end - pos;
+
+            if (avail >= 3 || (long) avail == utfLen) {
+                utfLen -= readUtfSpan(sbuf, utfLen);
+            } else {
+                // shift and refill buffer manually
+                if (avail > 0) {
+                    System.arraycopy(utfBuf, pos, utfBuf, 0, avail);
+                }
+
+                pos = 0;
+                end = (int) Math.min(MAX_BLOCK_SIZE, utfLen);
+
+                readFully(utfBuf, avail, end - avail);
+            }
+        }
+
+        return sbuf.toString();
+    }
+
+    /**
+     * Reads span of UTF-encoded characters out of internal buffer
+     * (starting at offset pos and ending at or before offset end),
+     * consuming no more than utfLen bytes. Appends read characters to
+     * sbuf. Returns the number of bytes consumed.
+     *
+     * @param sbuf String builder.
+     * @param utfLen UTF encoding length.
+     * @return Number of bytes consumed.
+     * @throws IOException In case of error.
+     */
+    @SuppressWarnings("ThrowFromFinallyBlock")
+    private long readUtfSpan(StringBuilder sbuf, long utfLen) throws IOException {
+        int cpos = 0;
+        int start = pos;
+        int avail = Math.min(end - pos, CHAR_BUF_SIZE);
+        int stop = pos + ((utfLen > avail) ? avail - 2 : (int) utfLen);
+        boolean outOfBounds = false;
+
+        try {
+            while (pos < stop) {
+                int b1 = utfBuf[pos++] & 0xFF;
+
+                int b2;
+                int b3;
+
+                switch (b1 >> 4) {
+                    case 0:
+                    case 1:
+                    case 2:
+                    case 3:
+                    case 4:
+                    case 5:
+                    case 6:
+                    case 7:
+                        // 1 byte format: 0xxxxxxx
+                        utfCharBuf[cpos++] = (char) b1;
+
+                        break;
+
+                    case 12:
+                    case 13:
+                        // 2 byte format: 110xxxxx 10xxxxxx
+                        b2 = utfBuf[pos++];
+
+                        if ((b2 & 0xC0) != 0x80) {
+                            throw new UTFDataFormatException();
+                        }
+
+                        utfCharBuf[cpos++] = (char) (((b1 & 0x1F) << 6) | (b2 & 0x3F));
+
+                        break;
+
+                    case 14:
+                        // 3 byte format: 1110xxxx 10xxxxxx 10xxxxxx
+                        b3 = utfBuf[pos + 1];
+                        b2 = utfBuf[pos];
+
+                        pos += 2;
+
+                        if ((b2 & 0xC0) != 0x80 || (b3 & 0xC0) != 0x80) {
+                            throw new UTFDataFormatException();
+                        }
+
+                        utfCharBuf[cpos++] = (char) (((b1 & 0x0F) << 12) | ((b2 & 0x3F) << 6) | (b3 & 0x3F));
+
+                        break;
+
+                    default:
+                        // 10xx xxxx, 1111 xxxx
+                        throw new UTFDataFormatException();
+                }
+            }
+        } catch (ArrayIndexOutOfBoundsException ignored) {
+            outOfBounds = true;
+        } finally {
+            if (outOfBounds || (pos - start) > utfLen) {
+                pos = start + (int) utfLen;
+
+                throw new UTFDataFormatException();
+            }
+        }
+
+        sbuf.append(utfCharBuf, 0, cpos);
+
+        return pos - start;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void mark(int readLimit) {
+        if (in != null) {
+            in.mark(readLimit);
+        } else {
+            mark = off;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean markSupported() {
+        return in == null || in.markSupported();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int available() throws IOException {
+        if (in != null) {
+            return in.available();
+        } else {
+            return Math.max(buf.length - off, 0);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public <T> T materializeFromNextBytes(int bytesCount, Materializer<? extends T> materializer) throws IOException {
+        fromStream(bytesCount);
+
+        int prevOffset = advanceOffset(bytesCount);
+
+        return materializer.materialize(buf, prevOffset, bytesCount);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public String toString() {
+        return IgniteToStringBuilder.toString(IgniteUnsafeDataInput.class, this);
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteUnsafeDataOutput.java b/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteUnsafeDataOutput.java
new file mode 100644
index 0000000..6753179
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteUnsafeDataOutput.java
@@ -0,0 +1,702 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.io;
+
+import static org.apache.ignite.internal.util.GridUnsafe.BIG_ENDIAN;
+import static org.apache.ignite.internal.util.GridUnsafe.BYTE_ARR_OFF;
+import static org.apache.ignite.internal.util.GridUnsafe.CHAR_ARR_OFF;
+import static org.apache.ignite.internal.util.GridUnsafe.DOUBLE_ARR_OFF;
+import static org.apache.ignite.internal.util.GridUnsafe.FLOAT_ARR_OFF;
+import static org.apache.ignite.internal.util.GridUnsafe.INT_ARR_OFF;
+import static org.apache.ignite.internal.util.GridUnsafe.LONG_ARR_OFF;
+import static org.apache.ignite.internal.util.GridUnsafe.SHORT_ARR_OFF;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import org.apache.ignite.internal.tostring.IgniteToStringBuilder;
+import org.apache.ignite.internal.util.FastTimestamps;
+import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.internal.util.StringIntrospection;
+
+/**
+ * Data output based on {@code Unsafe} operations.
+ */
+public class IgniteUnsafeDataOutput extends OutputStream implements IgniteDataOutput {
+    /**
+     * Based on ByteArrayOutputStream#MAX_ARRAY_SIZE or many other similar constants in other classes.
+     * It's not safe to allocate more then this number of elements in byte array, because it can throw
+     * java.lang.OutOfMemoryError: Requested array size exceeds VM limit
+     */
+    private static final int MAX_BYTE_ARRAY_SIZE = Integer.MAX_VALUE - 8;
+
+    /** Length of char buffer (for writing strings). */
+    private static final int CHAR_BUF_SIZE = 256;
+
+    /** Char buffer for fast string writes. */
+    private final char[] cbuf = new char[CHAR_BUF_SIZE];
+
+    private static final long NO_AUTO_SHRINK = -1;
+
+    private final long shrinkCheckFrequencyMs;
+
+    /** Bytes. */
+    private byte[] bytes;
+
+    /** Offset. */
+    private int off;
+
+    /** Underlying output stream. */
+    private OutputStream out;
+
+    /** Maximum message size. */
+    private int maxOff;
+
+    /** Last length check timestamp. */
+    private long lastAutoShrinkCheckTimestamp = FastTimestamps.coarseCurrentTimeMillis();
+
+    /**
+     * Creates a new output with auto-shrinking disabled and no internal buffer allocated.
+     */
+    public IgniteUnsafeDataOutput() {
+        this(NO_AUTO_SHRINK);
+    }
+
+    /**
+     * Creates a new output with no internal buffer allocated.
+     *
+     * @param shrinkCheckFrequencyMs how often to check whether an underlying byte buffer needs to be shrunk
+     *                               (disables the auto-shrinking if it's -1)
+     */
+    public IgniteUnsafeDataOutput(long shrinkCheckFrequencyMs) {
+        if (shrinkCheckFrequencyMs != NO_AUTO_SHRINK && shrinkCheckFrequencyMs <= 0) {
+            throw new IllegalArgumentException(
+                    "Auto-shrink frequency must be either -1 (when auto-shrinking is disabled) or positive, but it's "
+                            + shrinkCheckFrequencyMs
+            );
+        }
+
+        this.shrinkCheckFrequencyMs = shrinkCheckFrequencyMs;
+    }
+
+    /**
+     * Creates a new output with auto-shrinking disabled.
+     *
+     * @param size size of an internal buffer to create
+     */
+    public IgniteUnsafeDataOutput(int size) {
+        this(size, NO_AUTO_SHRINK);
+    }
+
+    /**
+     * Creates a new output.
+     *
+     * @param size Size of an internal buffer to create
+     * @param shrinkCheckFrequencyMs how often to check whether an underlying byte buffer needs to be shrunk
+     *                               (disables the auto-shrinking if it's -1)
+     */
+    public IgniteUnsafeDataOutput(int size, long shrinkCheckFrequencyMs) {
+        this(shrinkCheckFrequencyMs);
+
+        bytes = new byte[size];
+    }
+
+    /**
+     * Sets the internal buffer.
+     *
+     * @param bytes Bytes.
+     * @param off Offset.
+     */
+    public void bytes(byte[] bytes, int off) {
+        this.bytes = bytes;
+        this.off = off;
+    }
+
+    /**
+     * Sets the {@link OutputStream} to which to push data.
+     *
+     * @param out Underlying output stream.
+     */
+    @Override
+    public void outputStream(OutputStream out) {
+        this.out = out;
+
+        off = 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public byte[] array() {
+        byte[] bytes0 = new byte[off];
+
+        System.arraycopy(bytes, 0, bytes0, 0, off);
+
+        return bytes0;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public byte[] internalArray() {
+        return bytes;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int offset() {
+        return off;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void offset(int off) {
+        this.off = off;
+    }
+
+    private void requestFreeSize(int size) throws IOException {
+        if (!canBeAllocated(off + size)) {
+            throw new IOException("Failed to allocate required memory (byte array size overflow detected) "
+                    + "[length=" + size + ", offset=" + off + ']');
+        }
+
+        size = off + size;
+
+        maxOff = Math.max(maxOff, size);
+
+        if (size > bytes.length) {
+            int newSize = size << 1;
+
+            if (!canBeAllocated(newSize)) {
+                newSize = MAX_BYTE_ARRAY_SIZE;
+            }
+
+            bytes = Arrays.copyOf(bytes, newSize); // Grow.
+        } else if (isAutoShrinkEnabled()) {
+            long now = FastTimestamps.coarseCurrentTimeMillis();
+
+            if (now - lastAutoShrinkCheckTimestamp > shrinkCheckFrequencyMs) {
+                int halfSize = bytes.length >> 1;
+
+                if (maxOff < halfSize) {
+                    bytes = Arrays.copyOf(bytes, halfSize); // Shrink.
+                }
+
+                maxOff = 0;
+                lastAutoShrinkCheckTimestamp = now;
+            }
+        }
+    }
+
+    private boolean isAutoShrinkEnabled() {
+        return shrinkCheckFrequencyMs != NO_AUTO_SHRINK;
+    }
+
+    /**
+     * Returns true if {@code new byte[size]} won't throw {@link OutOfMemoryError} given enough heap space.
+     *
+     * @param size Size of potential byte array to check.
+     * @return true if {@code new byte[size]} won't throw {@link OutOfMemoryError} given enough heap space.
+     * @see IgniteUnsafeDataOutput#MAX_BYTE_ARRAY_SIZE
+     */
+    private boolean canBeAllocated(long size) {
+        return 0 <= size && size <= MAX_BYTE_ARRAY_SIZE;
+    }
+
+    /**
+     * If there is an {@link OutputStream}, writes the requested amount of buffered data to it; otherwise,
+     * just advances the offset of the internal buffer.
+     *
+     * @param size Size.
+     * @throws IOException In case of error.
+     */
+    private void onWrite(int size) throws IOException {
+        if (out != null) {
+            out.write(bytes, 0, size);
+        } else {
+            off += size;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void write(int b) throws IOException {
+        writeByte(b);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void write(byte[] b) throws IOException {
+        requestFreeSize(b.length);
+
+        System.arraycopy(b, 0, bytes, off, b.length);
+
+        onWrite(b.length);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+        requestFreeSize(len);
+
+        System.arraycopy(b, off, bytes, this.off, len);
+
+        onWrite(len);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void writeDoubleArray(double[] arr) throws IOException {
+        checkArrayAllocationOverflow(8, arr.length, "double");
+
+        int bytesToCp = arr.length << 3;
+
+        requestFreeSize(bytesToCp);
+
+        if (BIG_ENDIAN) {
+            long off = BYTE_ARR_OFF + this.off;
+
+            for (double val : arr) {
+                GridUnsafe.putDoubleLittleEndian(bytes, off, val);
+
+                off += 8;
+            }
+        } else {
+            GridUnsafe.copyMemory(arr, DOUBLE_ARR_OFF, bytes, BYTE_ARR_OFF + off, bytesToCp);
+        }
+
+        onWrite(bytesToCp);
+    }
+
+    private void putInt(int val, long off) {
+        if (BIG_ENDIAN) {
+            GridUnsafe.putIntLittleEndian(bytes, off, val);
+        } else {
+            GridUnsafe.putInt(bytes, off, val);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("ForLoopReplaceableByForEach")
+    @Override
+    public void writeBooleanArray(boolean[] arr) throws IOException {
+        for (int i = 0; i < arr.length; i++) {
+            writeBoolean(arr[i]);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void writeCharArray(char[] arr) throws IOException {
+        checkArrayAllocationOverflow(2, arr.length, "char");
+
+        int bytesToCp = arr.length << 1;
+
+        requestFreeSize(bytesToCp);
+
+        if (BIG_ENDIAN) {
+            long off = BYTE_ARR_OFF + this.off;
+
+            for (char val : arr) {
+                GridUnsafe.putCharLittleEndian(bytes, off, val);
+
+                off += 2;
+            }
+        } else {
+            GridUnsafe.copyMemory(arr, CHAR_ARR_OFF, bytes, BYTE_ARR_OFF + off, bytesToCp);
+        }
+
+        onWrite(bytesToCp);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void writeLongArray(long[] arr) throws IOException {
+        checkArrayAllocationOverflow(8, arr.length, "long");
+
+        int bytesToCp = arr.length << 3;
+
+        requestFreeSize(bytesToCp);
+
+        if (BIG_ENDIAN) {
+            long off = BYTE_ARR_OFF + this.off;
+
+            for (long val : arr) {
+                GridUnsafe.putLongLittleEndian(bytes, off, val);
+
+                off += 8;
+            }
+        } else {
+            GridUnsafe.copyMemory(arr, LONG_ARR_OFF, bytes, BYTE_ARR_OFF + off, bytesToCp);
+        }
+
+        onWrite(bytesToCp);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void writeFloatArray(float[] arr) throws IOException {
+        checkArrayAllocationOverflow(4, arr.length, "float");
+
+        int bytesToCp = arr.length << 2;
+
+        requestFreeSize(bytesToCp);
+
+        if (BIG_ENDIAN) {
+            long off = BYTE_ARR_OFF + this.off;
+
+            for (float val : arr) {
+                GridUnsafe.putFloatLittleEndian(bytes, off, val);
+
+                off += 4;
+            }
+        } else {
+            GridUnsafe.copyMemory(arr, FLOAT_ARR_OFF, bytes, BYTE_ARR_OFF + off, bytesToCp);
+        }
+
+        onWrite(bytesToCp);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void cleanup() {
+        off = 0;
+
+        out = null;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void writeByteArray(byte[] arr) throws IOException {
+        requestFreeSize(arr.length);
+
+        System.arraycopy(arr, 0, bytes, off, arr.length);
+
+        onWrite(arr.length);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void writeShortArray(short[] arr) throws IOException {
+        checkArrayAllocationOverflow(2, arr.length, "short");
+
+        int bytesToCp = arr.length << 1;
+
+        requestFreeSize(bytesToCp);
+
+        if (BIG_ENDIAN) {
+            long off = BYTE_ARR_OFF + this.off;
+
+            for (short val : arr) {
+                GridUnsafe.putShortLittleEndian(bytes, off, val);
+
+                off += 2;
+            }
+        } else {
+            GridUnsafe.copyMemory(arr, SHORT_ARR_OFF, bytes, BYTE_ARR_OFF + off, bytesToCp);
+        }
+
+        onWrite(bytesToCp);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void writeIntArray(int[] arr) throws IOException {
+        checkArrayAllocationOverflow(4, arr.length, "int");
+
+        int bytesToCp = arr.length << 2;
+
+        requestFreeSize(bytesToCp);
+
+        if (BIG_ENDIAN) {
+            long off = BYTE_ARR_OFF + this.off;
+
+            for (int val : arr) {
+                GridUnsafe.putIntLittleEndian(bytes, off, val);
+
+                off += 4;
+            }
+        } else {
+            GridUnsafe.copyMemory(arr, INT_ARR_OFF, bytes, BYTE_ARR_OFF + off, bytesToCp);
+        }
+
+        onWrite(bytesToCp);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void close() throws IOException {
+        cleanup();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void writeBoolean(boolean v) throws IOException {
+        requestFreeSize(1);
+
+        GridUnsafe.putBoolean(bytes, BYTE_ARR_OFF + off, v);
+
+        onWrite(1);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void writeByte(int v) throws IOException {
+        requestFreeSize(1);
+
+        putByte((byte) v, BYTE_ARR_OFF + off);
+
+        onWrite(1);
+    }
+
+    private void putByte(byte val, long off) {
+        GridUnsafe.putByte(bytes, off, val);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void writeShort(int v) throws IOException {
+        requestFreeSize(2);
+
+        short val = (short) v;
+
+        long off = BYTE_ARR_OFF + this.off;
+
+        putShort(val, off);
+
+        onWrite(2);
+    }
+
+    private void putShort(short val, long off) {
+        if (BIG_ENDIAN) {
+            GridUnsafe.putShortLittleEndian(bytes, off, val);
+        } else {
+            GridUnsafe.putShort(bytes, off, val);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void writeChar(int v) throws IOException {
+        requestFreeSize(2);
+
+        char val = (char) v;
+
+        long off = BYTE_ARR_OFF + this.off;
+
+        if (BIG_ENDIAN) {
+            GridUnsafe.putCharLittleEndian(bytes, off, val);
+        } else {
+            GridUnsafe.putChar(bytes, off, val);
+        }
+
+        onWrite(2);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void writeInt(int v) throws IOException {
+        requestFreeSize(4);
+
+        long off = BYTE_ARR_OFF + this.off;
+
+        putInt(v, off);
+
+        onWrite(4);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void writeLong(long v) throws IOException {
+        requestFreeSize(8);
+
+        long off = BYTE_ARR_OFF + this.off;
+
+        if (BIG_ENDIAN) {
+            GridUnsafe.putLongLittleEndian(bytes, off, v);
+        } else {
+            GridUnsafe.putLong(bytes, off, v);
+        }
+
+        onWrite(8);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void writeFloat(float v) throws IOException {
+        int val = Float.floatToIntBits(v);
+
+        writeInt(val);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void writeDouble(double v) throws IOException {
+        long val = Double.doubleToLongBits(v);
+
+        writeLong(val);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void writeBytes(String s) throws IOException {
+        int len = s.length();
+
+        if (utfLength(s) == len) {
+            writeAsciiStringBytes(s);
+        } else {
+            for (int i = 0; i < len; i++) {
+                writeByte(s.charAt(i));
+            }
+        }
+    }
+
+    private void writeAsciiStringBytes(String s) throws IOException {
+        writeByteArray(StringIntrospection.fastAsciiBytes(s));
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void writeChars(String s) throws IOException {
+        int len = s.length();
+
+        for (int i = 0; i < len; i++) {
+            writeChar(s.charAt(i));
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void writeUTF(String s) throws IOException {
+        writeUtf(s, utfLength(s));
+    }
+
+    /**
+     * Writes the given string in UTF format. This method is used in
+     * situations where the UTF encoding length of the string is already
+     * known; specifying it explicitly avoids a prescan of the string to
+     * determine its UTF length.
+     *
+     * @param s String.
+     * @param utfLen UTF length encoding.
+     * @throws IOException In case of error.
+     */
+    private void writeUtf(String s, int utfLen) throws IOException {
+        VarInts.writeUnsignedInt(utfLen, this);
+
+        if (utfLen == s.length()) {
+            writeAsciiStringBytes(s);
+        } else {
+            writeUtfBody(s);
+        }
+    }
+
+    /**
+     * Check for possible arithmetic overflow when trying to serialize a humongous array.
+     *
+     * @param bytes Number of bytes in a single array element.
+     * @param arrLen Array length.
+     * @param type Type of an array.
+     * @throws IOException If oveflow presents and data corruption can occur.
+     */
+    private void checkArrayAllocationOverflow(int bytes, int arrLen, String type) throws IOException {
+        long bytesToAlloc = (long) arrLen * bytes;
+
+        if (!canBeAllocated(bytesToAlloc)) {
+            throw new IOException("Failed to allocate required memory for " + type + " array "
+                    + "(byte array size overflow detected) [length=" + arrLen + ']');
+        }
+    }
+
+    /**
+     * Returns the length in bytes of the UTF encoding of the given string.
+     *
+     * @param s String.
+     * @return UTF encoding length.
+     */
+    private int utfLength(String s) {
+        int len = s.length();
+        int utfLen = 0;
+
+        for (int off = 0; off < len; ) {
+            int size = Math.min(len - off, CHAR_BUF_SIZE);
+
+            s.getChars(off, off + size, cbuf, 0);
+
+            for (int pos = 0; pos < size; pos++) {
+                char c = cbuf[pos];
+
+                if (c >= 0x0001 && c <= 0x007F) {
+                    utfLen++;
+                } else {
+                    utfLen += c > 0x07FF ? 3 : 2;
+                }
+            }
+
+            off += size;
+        }
+
+        return utfLen;
+    }
+
+    /**
+     * Writes the "body" (i.e., the UTF representation minus the 2-byte or
+     * 8-byte length header) of the UTF encoding for the given string.
+     *
+     * @param s String.
+     * @throws IOException In case of error.
+     */
+    private void writeUtfBody(String s) throws IOException {
+        int len = s.length();
+
+        for (int off = 0; off < len; ) {
+            int csize = Math.min(len - off, CHAR_BUF_SIZE);
+
+            s.getChars(off, off + csize, cbuf, 0);
+
+            for (int cpos = 0; cpos < csize; cpos++) {
+                char c = cbuf[cpos];
+
+                if (c <= 0x007F && c != 0) {
+                    write(c);
+                } else if (c > 0x07FF) {
+                    write(0xE0 | ((c >> 12) & 0x0F));
+                    write(0x80 | ((c >> 6) & 0x3F));
+                    write(0x80 | ((c) & 0x3F));
+                } else {
+                    write(0xC0 | ((c >> 6) & 0x1F));
+                    write(0x80 | ((c) & 0x3F));
+                }
+            }
+
+            off += csize;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void flush() throws IOException {
+        if (out != null) {
+            out.flush();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public String toString() {
+        return IgniteToStringBuilder.toString(IgniteUnsafeDataOutput.class, this);
+    }
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/VarInts.java b/modules/core/src/main/java/org/apache/ignite/internal/util/io/VarInts.java
similarity index 86%
rename from modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/VarInts.java
rename to modules/core/src/main/java/org/apache/ignite/internal/util/io/VarInts.java
index f3a70c4..98e7aff 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/VarInts.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/io/VarInts.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.network.serialization.marshal;
+package org.apache.ignite.internal.util.io;
 
 import java.io.DataInput;
 import java.io.DataOutput;
@@ -24,23 +24,23 @@ import java.io.IOException;
 /**
  * Utils to read/write variable length ints.
  */
-class VarInts {
+public class VarInts {
     private VarInts() {
     }
 
     /**
-     * Writes a unsigned int using variable length format. If it's less than 0xFF, it's written as one byte.
+     * Writes an unsigned int using variable length format. If it's less than 0xFF, it's written as one byte.
      * If it's more than 0xFE, but less than 0xFFFF, it's written as 3 bytes: first one byte equal to 0xFF, then 2 bytes
      * as {@link DataOutput#writeShort(int)} writes them. Otherwise, it writes 3 0xFF bytes, then writes
      * {@link DataOutput#writeInt(int)}.
      * This may be beneficial for the cases when we need to write an unsigned int, but most of the time the values
-     * are small.
+     * are small (for example, when writing an array/collection/string length).
      *
      * @param value  value to write
      * @param output where to write to value to
      * @throws IOException if an I/O error occurs
      */
-    static void writeUnsignedInt(int value, DataOutput output) throws IOException {
+    public static void writeUnsignedInt(int value, DataOutput output) throws IOException {
         if (value < 0) {
             throw new IllegalArgumentException(value + " is negative");
         }
@@ -65,7 +65,7 @@ class VarInts {
      * @throws IOException if an I/O error occurs
      * @see #writeUnsignedInt(int, DataOutput)
      */
-    static int readUnsignedInt(DataInput input) throws IOException {
+    public static int readUnsignedInt(DataInput input) throws IOException {
         int first = input.readUnsignedByte();
         if (first < 0xFF) {
             return first;
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/StringIntrospectionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/StringIntrospectionTest.java
new file mode 100644
index 0000000..8e96566
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/StringIntrospectionTest.java
@@ -0,0 +1,63 @@
+package org.apache.ignite.internal.util;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static java.nio.charset.StandardCharsets.ISO_8859_1;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.junit.jupiter.api.Test;
+
+class StringIntrospectionTest {
+
+    private static final String ASCII = "ASCII";
+    private static final String NOT_ASCII_BUT_LATIN1 = "Not ASCII: é";
+
+    @Test
+    void asciiStringsSupportFastGetLatin1Bytes() {
+        assertTrue(StringIntrospection.supportsFastGetLatin1Bytes(ASCII));
+    }
+
+    @Test
+    void latin1StringsSupportsFastGetLatin1Bytes() {
+        assertTrue(StringIntrospection.supportsFastGetLatin1Bytes(NOT_ASCII_BUT_LATIN1));
+    }
+
+    @Test
+    void nonLatin1StringsDoNotSupportFastGetLatin1Bytes() {
+        assertFalse(StringIntrospection.supportsFastGetLatin1Bytes("Not Latin1: кириллица"));
+    }
+
+    @Test
+    void fastAsciiBytesReturnsCorrectBytes() {
+        byte[] asciiBytes = StringIntrospection.fastAsciiBytes(ASCII);
+
+        assertThat(asciiBytes, is(equalTo(ASCII.getBytes(UTF_8))));
+    }
+
+    @Test
+    void fastLatin1BytesReturnsCorrectBytes() {
+        byte[] asciiBytes = StringIntrospection.fastLatin1Bytes(NOT_ASCII_BUT_LATIN1);
+
+        assertThat(asciiBytes, is(equalTo(NOT_ASCII_BUT_LATIN1.getBytes(ISO_8859_1))));
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/io/IgniteTestIoUtils.java b/modules/core/src/test/java/org/apache/ignite/internal/util/io/IgniteTestIoUtils.java
new file mode 100644
index 0000000..f04bdbe
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/io/IgniteTestIoUtils.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.io;
+
+/**
+ * IO test utilities.
+ */
+public final class IgniteTestIoUtils {
+
+    /**
+     * Gets short value from byte array assuming that value stored in little-endian byte order.
+     *
+     * @param arr Byte array.
+     */
+    public static short getShortByByteLittleEndian(byte[] arr) {
+        return getShortByByteLittleEndian(arr, 0);
+    }
+
+    /**
+     * Gets short value from byte array assuming that value stored in little-endian byte order.
+     *
+     * @param arr Byte array.
+     * @param off Offset.
+     */
+    public static short getShortByByteLittleEndian(byte[] arr, int off) {
+        return (short) ((arr[off] & 0xff) | arr[off + 1] << 8);
+    }
+
+    /**
+     * Gets char value from byte array assuming that value stored in little-endian byte order.
+     *
+     * @param arr Byte array.
+     */
+    public static char getCharByByteLittleEndian(byte[] arr) {
+        return getCharByByteLittleEndian(arr, 0);
+    }
+
+    /**
+     * Gets char value from byte array assuming that value stored in little-endian byte order.
+     *
+     * @param arr Byte array.
+     * @param off Offset.
+     */
+    public static char getCharByByteLittleEndian(byte[] arr, int off) {
+        return (char) ((arr[off] & 0xff) | arr[off + 1] << 8);
+    }
+
+    /**
+     * Gets integer value from byte array assuming that value stored in little-endian byte order.
+     *
+     * @param arr Byte array.
+     */
+    public static int getIntByByteLittleEndian(byte[] arr) {
+        return getIntByByteLittleEndian(arr, 0);
+    }
+
+    /**
+     * Gets integer value from byte array assuming that value stored in little-endian byte order.
+     *
+     * @param arr Byte array.
+     * @param off Offset.
+     */
+    public static int getIntByByteLittleEndian(byte[] arr, int off) {
+        return ((int) arr[off] & 0xff) | ((int) arr[off + 1] & 0xff) << 8
+                | ((int) arr[off + 2] & 0xff) << 16 | ((int) arr[off + 3] & 0xff) << 24;
+    }
+
+    /**
+     * Gets long value from byte array assuming that value stored in little-endian byte order.
+     *
+     * @param arr Byte array.
+     */
+    public static long getLongByByteLittleEndian(byte[] arr) {
+        return getLongByByteLittleEndian(arr, 0);
+    }
+
+    /**
+     * Gets long value from byte array assuming that value stored in little-endian byte order.
+     *
+     * @param arr Byte array.
+     * @param off Offset.
+     */
+    public static long getLongByByteLittleEndian(byte[] arr, int off) {
+        return ((long) arr[off] & 0xff) | ((long) arr[off + 1] & 0xff) << 8 | ((long) arr[off + 2] & 0xff) << 16
+                | ((long) arr[off + 3] & 0xff) << 24 | ((long) arr[off + 4] & 0xff) << 32 | ((long) arr[off + 5] & 0xff) << 40
+                | ((long) arr[off + 6] & 0xff) << 48 | ((long) arr[off + 7] & 0xff) << 56;
+    }
+
+    /**
+     * Gets float value from byte array assuming that value stored in little-endian byte order.
+     *
+     * @param arr Byte array.
+     */
+    public static float getFloatByByteLittleEndian(byte[] arr) {
+        return getFloatByByteLittleEndian(arr, 0);
+    }
+
+    /**
+     * Gets float value from byte array assuming that value stored in little-endian byte order.
+     *
+     * @param arr Byte array.
+     * @param off Offset.
+     */
+    public static float getFloatByByteLittleEndian(byte[] arr, int off) {
+        return Float.intBitsToFloat(getIntByByteLittleEndian(arr, off));
+    }
+
+    /**
+     * Gets double value from byte array assuming that value stored in little-endian byte order.
+     *
+     * @param arr Byte array.
+     */
+    public static double getDoubleByByteLittleEndian(byte[] arr) {
+        return getDoubleByByteLittleEndian(arr, 0);
+    }
+
+    /**
+     * Gets double value from byte array assuming that value stored in little-endian byte order.
+     *
+     * @param arr Byte array.
+     * @param off Offset.
+     */
+    public static double getDoubleByByteLittleEndian(byte[] arr, int off) {
+        return Double.longBitsToDouble(getLongByByteLittleEndian(arr, off));
+    }
+
+    /**
+     * Ensure singleton.
+     */
+    private IgniteTestIoUtils() {
+        // No-op.
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/io/IgniteUnsafeDataInputOutputByteOrderTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/io/IgniteUnsafeDataInputOutputByteOrderTest.java
new file mode 100644
index 0000000..afdfe79
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/io/IgniteUnsafeDataInputOutputByteOrderTest.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.io;
+
+import static org.apache.ignite.internal.util.io.IgniteTestIoUtils.getCharByByteLittleEndian;
+import static org.apache.ignite.internal.util.io.IgniteTestIoUtils.getDoubleByByteLittleEndian;
+import static org.apache.ignite.internal.util.io.IgniteTestIoUtils.getFloatByByteLittleEndian;
+import static org.apache.ignite.internal.util.io.IgniteTestIoUtils.getIntByByteLittleEndian;
+import static org.apache.ignite.internal.util.io.IgniteTestIoUtils.getLongByByteLittleEndian;
+import static org.apache.ignite.internal.util.io.IgniteTestIoUtils.getShortByByteLittleEndian;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.io.ByteArrayInputStream;
+import java.util.Random;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Ignite unsafe data input/output byte order sanity tests.
+ */
+class IgniteUnsafeDataInputOutputByteOrderTest {
+    /** Array length. */
+    private static final int ARR_LEN = 16;
+
+    /** Length bytes. */
+    private static final int LEN_BYTES = 0;
+
+    /** Rnd. */
+    private static final Random RND = new Random();
+
+    /** Out. */
+    private IgniteUnsafeDataOutput out;
+
+    /** In. */
+    private IgniteUnsafeDataInput in;
+
+    @BeforeEach
+    public void setUp() throws Exception {
+        out = new IgniteUnsafeDataOutput(16 * 8 + LEN_BYTES);
+        in = new IgniteUnsafeDataInput();
+        in.inputStream(new ByteArrayInputStream(out.internalArray()));
+    }
+
+    @AfterEach
+    public void tearDown() throws Exception {
+        in.close();
+        out.close();
+    }
+
+    @Test
+    public void testShort() throws Exception {
+        short val = (short) RND.nextLong();
+
+        out.writeShort(val);
+
+        assertEquals(val, IgniteTestIoUtils.getShortByByteLittleEndian(out.internalArray()));
+        assertEquals(val, in.readShort());
+    }
+
+    @Test
+    public void testShortArray() throws Exception {
+        short[] arr = new short[ARR_LEN];
+
+        for (int i = 0; i < ARR_LEN; i++) {
+            arr[i] = (short) RND.nextLong();
+        }
+
+        out.writeShortArray(arr);
+
+        byte[] outArr = out.internalArray();
+
+        for (int i = 0; i < ARR_LEN; i++) {
+            assertEquals(arr[i], getShortByByteLittleEndian(outArr, i * 2 + LEN_BYTES));
+        }
+
+        assertArrayEquals(arr, in.readShortArray(ARR_LEN));
+    }
+
+    @Test
+    public void testChar() throws Exception {
+        char val = (char) RND.nextLong();
+
+        out.writeChar(val);
+
+        assertEquals(val, IgniteTestIoUtils.getCharByByteLittleEndian(out.internalArray()));
+        assertEquals(val, in.readChar());
+    }
+
+    @Test
+    public void testCharArray() throws Exception {
+        char[] arr = new char[ARR_LEN];
+
+        for (int i = 0; i < ARR_LEN; i++) {
+            arr[i] = (char) RND.nextLong();
+        }
+
+        out.writeCharArray(arr);
+
+        byte[] outArr = out.internalArray();
+
+        for (int i = 0; i < ARR_LEN; i++) {
+            assertEquals(arr[i], getCharByByteLittleEndian(outArr, i * 2 + LEN_BYTES));
+        }
+
+        assertArrayEquals(arr, in.readCharArray(ARR_LEN));
+    }
+
+    @Test
+    public void testInt() throws Exception {
+        int val = RND.nextInt();
+
+        out.writeInt(val);
+
+        assertEquals(val, IgniteTestIoUtils.getIntByByteLittleEndian(out.internalArray()));
+        assertEquals(val, in.readInt());
+    }
+
+    @Test
+    public void testIntArray() throws Exception {
+        int[] arr = new int[ARR_LEN];
+
+        for (int i = 0; i < ARR_LEN; i++) {
+            arr[i] = RND.nextInt();
+        }
+
+        out.writeIntArray(arr);
+
+        byte[] outArr = out.internalArray();
+
+        for (int i = 0; i < ARR_LEN; i++) {
+            assertEquals(arr[i], getIntByByteLittleEndian(outArr, i * 4 + LEN_BYTES));
+        }
+
+        assertArrayEquals(arr, in.readIntArray(ARR_LEN));
+    }
+
+    @Test
+    public void testLong() throws Exception {
+        long val = RND.nextLong();
+
+        out.writeLong(val);
+
+        assertEquals(val, IgniteTestIoUtils.getLongByByteLittleEndian(out.internalArray()));
+        assertEquals(val, in.readLong());
+    }
+
+    @Test
+    public void testLongArray() throws Exception {
+        long[] arr = new long[ARR_LEN];
+
+        for (int i = 0; i < ARR_LEN; i++) {
+            arr[i] = RND.nextLong();
+        }
+
+        out.writeLongArray(arr);
+
+        byte[] outArr = out.internalArray();
+
+        for (int i = 0; i < ARR_LEN; i++) {
+            assertEquals(arr[i], getLongByByteLittleEndian(outArr, i * 8 + LEN_BYTES));
+        }
+
+        assertArrayEquals(arr, in.readLongArray(ARR_LEN));
+    }
+
+    @Test
+    public void testFloat() throws Exception {
+        float val = RND.nextFloat();
+
+        out.writeFloat(val);
+
+        assertEquals(val, IgniteTestIoUtils.getFloatByByteLittleEndian(out.internalArray()), 0);
+        assertEquals(val, in.readFloat(), 0);
+    }
+
+    @Test
+    public void testFloatArray() throws Exception {
+        float[] arr = new float[ARR_LEN];
+
+        for (int i = 0; i < ARR_LEN; i++) {
+            arr[i] = RND.nextFloat();
+        }
+
+        out.writeFloatArray(arr);
+
+        byte[] outArr = out.internalArray();
+
+        for (int i = 0; i < ARR_LEN; i++) {
+            assertEquals(arr[i], getFloatByByteLittleEndian(outArr, i * 4 + LEN_BYTES), 0);
+        }
+
+        assertArrayEquals(arr, in.readFloatArray(ARR_LEN), 0);
+    }
+
+    @Test
+    public void testDouble() throws Exception {
+        double val = RND.nextDouble();
+
+        out.writeDouble(val);
+
+        assertEquals(val, IgniteTestIoUtils.getDoubleByByteLittleEndian(out.internalArray()), 0);
+        assertEquals(val, in.readDouble(), 0);
+    }
+
+    @Test
+    public void testDoubleArray() throws Exception {
+        double[] arr = new double[ARR_LEN];
+
+        for (int i = 0; i < ARR_LEN; i++) {
+            arr[i] = RND.nextDouble();
+        }
+
+        out.writeDoubleArray(arr);
+
+        byte[] outArr = out.internalArray();
+
+        for (int i = 0; i < ARR_LEN; i++) {
+            assertEquals(arr[i], getDoubleByByteLittleEndian(outArr, i * 8 + LEN_BYTES), 0);
+        }
+
+        assertArrayEquals(arr, in.readDoubleArray(ARR_LEN), 0);
+    }
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/ValueWriter.java b/modules/core/src/test/java/org/apache/ignite/internal/util/io/IgniteUnsafeDataInputTest.java
similarity index 54%
copy from modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/ValueWriter.java
copy to modules/core/src/test/java/org/apache/ignite/internal/util/io/IgniteUnsafeDataInputTest.java
index f7b2ede..8038459 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/ValueWriter.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/io/IgniteUnsafeDataInputTest.java
@@ -15,23 +15,26 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.network.serialization.marshal;
+package org.apache.ignite.internal.util.io;
 
-import java.io.DataOutputStream;
-import java.io.IOException;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThan;
 
-/**
- * Knows how to write a value to a {@link DataOutputStream}.
- */
-interface ValueWriter<T> {
-    /**
-     * Writes the given value to a {@link DataOutputStream}.
-     *
-     * @param value     value to write
-     * @param output    where to write to
-     * @param context   marshalling context
-     * @throws IOException      if an I/O problem occurs
-     * @throws MarshalException if another problem occurs
-     */
-    void write(T value, DataOutputStream output, MarshallingContext context) throws IOException, MarshalException;
+import org.junit.jupiter.api.Test;
+
+class IgniteUnsafeDataInputTest {
+    @SuppressWarnings("ResultOfMethodCallIgnored")
+    @Test
+    void readReturnsNegativeValueWhenEndOfStreamIsReached() throws Exception {
+        var input = new IgniteUnsafeDataInput();
+        input.bytes(new byte[]{1, 2, 3}, 3);
+
+        byte[] bytes = new byte[4096];
+        input.read(bytes, 0, bytes.length);
+
+        int read = input.read(bytes, 0, bytes.length);
+
+        assertThat(read, is(lessThan(0)));
+    }
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/io/IgniteUnsafeDataOutputArraySizingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/io/IgniteUnsafeDataOutputArraySizingTest.java
new file mode 100644
index 0000000..58b0509
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/io/IgniteUnsafeDataOutputArraySizingTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.io;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.function.BooleanSupplier;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Test for {@link IgniteUnsafeDataOutput} concerning internal buffer sizing.
+ */
+class IgniteUnsafeDataOutputArraySizingTest {
+    /** Small array. */
+    private static final byte[] SMALL = new byte[32];
+
+    /** Big array. */
+    private static final byte[] BIG = new byte[2048];
+
+    /** Buffer timeout. */
+    private static final long BUFFER_TIMEOUT = 1000;
+
+    /** Wait timeout is bigger then buffer timeout to prevent failures due to time measurement error. */
+    private static final long WAIT_BUFFER_TIMEOUT = BUFFER_TIMEOUT + BUFFER_TIMEOUT / 2;
+
+    @Test
+    public void testShrink() throws Exception {
+        final IgniteUnsafeDataOutput out = new IgniteUnsafeDataOutput(512, BUFFER_TIMEOUT);
+
+        assertTrue(IgniteTestUtils.waitForCondition(new WriteAndCheckPredicate(out, SMALL, 256), WAIT_BUFFER_TIMEOUT));
+        assertTrue(IgniteTestUtils.waitForCondition(new WriteAndCheckPredicate(out, SMALL, 128), WAIT_BUFFER_TIMEOUT));
+        assertTrue(IgniteTestUtils.waitForCondition(new WriteAndCheckPredicate(out, SMALL, 64), WAIT_BUFFER_TIMEOUT));
+        assertFalse(IgniteTestUtils.waitForCondition(new WriteAndCheckPredicate(out, SMALL, 32), WAIT_BUFFER_TIMEOUT));
+        assertTrue(IgniteTestUtils.waitForCondition(new WriteAndCheckPredicate(out, SMALL, 64), WAIT_BUFFER_TIMEOUT));
+    }
+
+    @Test
+    public void testGrow() throws Exception {
+        IgniteUnsafeDataOutput out = new IgniteUnsafeDataOutput(512);
+
+        out.write(BIG);
+        out.cleanup();
+
+        assertEquals(4096, out.internalArray().length);
+    }
+
+    @Test
+    public void testChanged1() throws Exception {
+        IgniteUnsafeDataOutput out = new IgniteUnsafeDataOutput(512, BUFFER_TIMEOUT);
+
+        for (int i = 0; i < 100; i++) {
+            Thread.sleep(100);
+
+            out.write(SMALL);
+            out.cleanup();
+            out.write(BIG);
+            out.cleanup();
+        }
+
+        assertEquals(4096, out.internalArray().length);
+    }
+
+    @Test
+    public void testChanged2() throws Exception {
+        final IgniteUnsafeDataOutput out = new IgniteUnsafeDataOutput(512, BUFFER_TIMEOUT);
+
+        assertTrue(IgniteTestUtils.waitForCondition(new WriteAndCheckPredicate(out, SMALL, 256), WAIT_BUFFER_TIMEOUT));
+
+        out.write(BIG);
+        out.cleanup();
+        assertEquals(4096, out.internalArray().length);
+
+        assertTrue(IgniteTestUtils.waitForCondition(new WriteAndCheckPredicate(out, SMALL, 4096), WAIT_BUFFER_TIMEOUT));
+        assertTrue(IgniteTestUtils.waitForCondition(new WriteAndCheckPredicate(out, SMALL, 2048), 2 * WAIT_BUFFER_TIMEOUT));
+    }
+
+    private static class WriteAndCheckPredicate implements BooleanSupplier {
+        final IgniteUnsafeDataOutput out;
+
+        final byte[] bytes;
+
+        final int len;
+
+        WriteAndCheckPredicate(IgniteUnsafeDataOutput out, byte[] bytes, int len) {
+            this.out = out;
+            this.bytes = bytes;
+            this.len = len;
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public boolean getAsBoolean() {
+            try {
+                out.write(bytes);
+                out.cleanup();
+
+                System.out.println("L=" + out.internalArray().length);
+
+                return out.internalArray().length == len;
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+}
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/marshal/VarIntsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/io/VarIntsTest.java
similarity index 98%
rename from modules/network/src/test/java/org/apache/ignite/internal/network/serialization/marshal/VarIntsTest.java
rename to modules/core/src/test/java/org/apache/ignite/internal/util/io/VarIntsTest.java
index 744f631..f0c62d5 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/marshal/VarIntsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/io/VarIntsTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.network.serialization.marshal;
+package org.apache.ignite.internal.util.io;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
diff --git a/modules/network/pom.xml b/modules/network/pom.xml
index 379dec6..c5c97c4 100644
--- a/modules/network/pom.xml
+++ b/modules/network/pom.xml
@@ -71,6 +71,24 @@
 
         <!-- Test dependencies -->
         <dependency>
+            <groupId>org.openjdk.jmh</groupId>
+            <artifactId>jmh-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.openjdk.jmh</groupId>
+            <artifactId>jmh-generator-annprocess</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>com.esotericsoftware</groupId>
+            <artifactId>kryo</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
             <groupId>org.apache.ignite</groupId>
             <artifactId>ignite-configuration</artifactId>
             <type>test-jar</type>
@@ -146,6 +164,12 @@
                         <artifactId>ignite-network-annotation-processor</artifactId>
                         <version>${project.version}</version>
                     </dependency>
+
+                    <dependency>
+                        <groupId>org.openjdk.jmh</groupId>
+                        <artifactId>jmh-generator-annprocess</artifactId>
+                        <version>${jmh.framework.version}</version>
+                    </dependency>
                 </dependencies>
                 <configuration>
                     <annotationProcessorPaths>
@@ -154,6 +178,11 @@
                             <artifactId>ignite-network-annotation-processor</artifactId>
                             <version>${project.version}</version>
                         </path>
+                        <path>
+                            <groupId>org.openjdk.jmh</groupId>
+                            <artifactId>jmh-generator-annprocess</artifactId>
+                            <version>${jmh.framework.version}</version>
+                        </path>
                     </annotationProcessorPaths>
                 </configuration>
             </plugin>
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/direct/stream/DirectByteBufferStreamImplV1.java b/modules/network/src/main/java/org/apache/ignite/internal/network/direct/stream/DirectByteBufferStreamImplV1.java
index e84a474..7dd46b8 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/direct/stream/DirectByteBufferStreamImplV1.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/direct/stream/DirectByteBufferStreamImplV1.java
@@ -1380,7 +1380,7 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
                     MarshalledObject res = serializationService.writeMarshallable(object);
                     marshallable = res.bytes();
                     // Get descriptors that were not previously sent to the remote node
-                    descriptors = serializationService.createClassDescriptorsMessages(res.usedDescriptors());
+                    descriptors = serializationService.createClassDescriptorsMessages(res.usedDescriptorIds());
                 }
 
                 writeCollection(descriptors, MessageCollectionItemType.MSG, writer);
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/BuiltInType.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/BuiltInType.java
index 56beee6..30c523c 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/BuiltInType.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/BuiltInType.java
@@ -76,7 +76,8 @@ public enum BuiltInType {
     NULL(38, Null.class),
     REFERENCE(39, ReferencePlaceholder.class),
     CLASS(40, Class.class),
-    PROXY(41, Proxy.class)
+    PROXY(41, Proxy.class),
+    STRING_LATIN1(42, StringLatin1Placeholder.class)
     ;
 
     /**
@@ -136,6 +137,6 @@ public enum BuiltInType {
     private static class ReferencePlaceholder {
     }
 
-    private static class NotNullPlaceholder {
+    private static class StringLatin1Placeholder {
     }
 }
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/BuiltInTypeIds.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/BuiltInTypeIds.java
index fe1380b..09c074b 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/BuiltInTypeIds.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/BuiltInTypeIds.java
@@ -30,4 +30,13 @@ public class BuiltInTypeIds {
     public static final int DOUBLE = 10;
     public static final int BOOLEAN = 12;
     public static final int CHAR = 14;
+
+    public static final int BYTE_ARRAY = 21;
+    public static final int SHORT_ARRAY = 22;
+    public static final int INT_ARRAY = 23;
+    public static final int FLOAT_ARRAY = 24;
+    public static final int LONG_ARRAY = 25;
+    public static final int DOUBLE_ARRAY = 26;
+    public static final int BOOLEAN_ARRAY = 27;
+    public static final int CHAR_ARRAY = 28;
 }
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/ClassDescriptor.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/ClassDescriptor.java
index 16c9602..035388d 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/ClassDescriptor.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/ClassDescriptor.java
@@ -23,6 +23,8 @@ import it.unimi.dsi.fastutil.objects.Object2IntMap;
 import it.unimi.dsi.fastutil.objects.Object2IntMaps;
 import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
 import java.lang.reflect.Modifier;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
@@ -99,6 +101,8 @@ public class ClassDescriptor {
     /** Indices of non-primitive fields in the object fields array. */
     private Object2IntMap<String> objectFieldIndices;
 
+    private final List<ClassDescriptor> lineage;
+
     private final SpecialSerializationMethods serializationMethods;
 
     /**
@@ -124,6 +128,8 @@ public class ClassDescriptor {
         fieldNullsBitmapSize = computeFieldNullsBitmapSize(fields);
         fieldNullsBitmapIndices = computeFieldNullsBitmapIndices(fields);
 
+        lineage = computeLineage(this);
+
         serializationMethods = new SpecialSerializationMethodsImpl(this);
     }
 
@@ -166,6 +172,20 @@ public class ClassDescriptor {
         return Object2IntMaps.unmodifiable(map);
     }
 
+    private static List<ClassDescriptor> computeLineage(ClassDescriptor descriptor) {
+        List<ClassDescriptor> descriptors = new ArrayList<>();
+
+        ClassDescriptor currentDesc = descriptor;
+        while (currentDesc != null) {
+            descriptors.add(currentDesc);
+            currentDesc = currentDesc.superClassDescriptor();
+        }
+
+        Collections.reverse(descriptors);
+
+        return List.copyOf(descriptors);
+    }
+
     /**
      * Returns descriptor id.
      *
@@ -529,6 +549,15 @@ public class ClassDescriptor {
     }
 
     /**
+     * Returns the lineage (all the ancestors, from the progenitor (excluding Object) down the line, including this descriptor).
+     *
+     * @return ancestors from the progenitor (excluding Object) down the line, plus this descriptor
+     */
+    public List<ClassDescriptor> lineage() {
+        return lineage;
+    }
+
+    /**
      * Returns {@code true} if the descriptor describes an enum class.
      *
      * @return {@code true} if the descriptor describes an enum class
@@ -538,9 +567,19 @@ public class ClassDescriptor {
     }
 
     /**
+     * Returns {@code true} if the descriptor describes a String that is represented with Latin-1 internally.
+     * Needed to apply an optimization.
+     *
+     * @return {@code true} if the descriptor describes a String that is represented with Latin-1 internally
+     */
+    public boolean isLatin1String() {
+        return descriptorId == BuiltInType.STRING_LATIN1.descriptorId();
+    }
+
+    /**
      * Returns {@code true} if a field (or array item) of the described class can only host (at runtime) instances of this type
-     * (and not subtypes), so the runtime type is known upfront. This is also true for enums, even though technically their values
-     * might have subtypes; but we serialize them using their names, so we still treat the type as known upfront.
+     * (and not subtypes), so the runtime marshalling type is known upfront. This is also true for enums, even though technically
+     * their values might have subtypes; but we serialize them using their names, so we still treat the type as known upfront.
      *
      * @return {@code true} if a field (or array item) of the described class can only host (at runtime) instances of the concrete type
      *     that is known upfront
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/ClassDescriptorRegistry.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/ClassDescriptorRegistry.java
index c90f13c..2b36769 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/ClassDescriptorRegistry.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/ClassDescriptorRegistry.java
@@ -36,6 +36,8 @@ public class ClassDescriptorRegistry implements DescriptorRegistry {
     /** Sequential id generator for class descriptors. */
     private final AtomicInteger idGenerator = new AtomicInteger(BUILTIN_DESCRIPTORS_OFFSET_COUNT);
 
+    // TODO: IGNITE-16464 - do not keep references to classes forever
+
     /** Map class -> descriptor id. */
     private final ConcurrentMap<Class<?>, Integer> idMap = new ConcurrentHashMap<>();
 
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/Classes.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/Classes.java
index b4e71c8..f450b1d 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/Classes.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/Classes.java
@@ -97,8 +97,8 @@ public class Classes {
 
     /**
      * Returns {@code true} if a field (or array item) of the described class can only host (at runtime) instances of this type
-     * (and not subtypes), so the runtime type is known upfront. This is also true for enums, even though technically their values
-     * might have subtypes; but we serialize them using their names, so we still treat the type as known upfront.
+     * (and not subtypes), so the runtime marshalling type is known upfront. This is also true for enums, even though technically
+     * their values might have subtypes; but we serialize them using their names, so we still treat the type as known upfront.
      *
      * @return {@code true} if a field (or array item) of the described class can only host (at runtime) instances of the concrete type
      *     that is known upfront
@@ -108,6 +108,11 @@ public class Classes {
             return isRuntimeTypeKnownUpfront(clazz.getComponentType());
         }
 
+        if (clazz == String.class) {
+            // a String may be represented with more than one built-in type, so sd don't know the type upfront
+            return false;
+        }
+
         return clazz.isPrimitive() || Modifier.isFinal(clazz.getModifiers()) || isRuntimeEnum(clazz);
     }
 
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/FieldDescriptor.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/FieldDescriptor.java
index 09f2764..754ab82 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/FieldDescriptor.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/FieldDescriptor.java
@@ -125,8 +125,8 @@ public class FieldDescriptor {
 
     /**
      * Returns {@code true} if the field can only host (at runtime) instances of its declared type (and not subtypes),
-     * so the runtime type is known upfront. This is also true for enums, even though technically their values might have subtypes;
-     * but we serialize them using their names, so we still treat the type as known upfront.
+     * so the runtime marshalling type is known upfront. This is also true for enums, even though technically their values might
+     * have subtypes; but we serialize them using their names, so we still treat the type as known upfront.
      *
      * @return {@code true} if the field can only host (at runtime) instances of the declared type that is known upfront
      */
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/PerSessionSerializationService.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/PerSessionSerializationService.java
index d6200fe..7d8989c 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/PerSessionSerializationService.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/PerSessionSerializationService.java
@@ -27,7 +27,6 @@ import it.unimi.dsi.fastutil.ints.IntSet;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import org.apache.ignite.internal.network.NetworkMessagesFactory;
 import org.apache.ignite.internal.network.message.ClassDescriptorMessage;
 import org.apache.ignite.internal.network.message.FieldDescriptorMessage;
@@ -115,8 +114,7 @@ public class PerSessionSerializationService {
      * @throws UserObjectSerializationException If failed to serialize an object.
      * @see SerializationService#writeMarshallable(Object)
      */
-    public <T> MarshalledObject writeMarshallable(T marshallable)
-            throws UserObjectSerializationException {
+    public <T> MarshalledObject writeMarshallable(T marshallable) throws UserObjectSerializationException {
         return serializationService.writeMarshallable(marshallable);
     }
 
@@ -139,12 +137,13 @@ public class PerSessionSerializationService {
     /**
      * Creates a list of messages holding class descriptors.
      *
-     * @param descriptors Class descriptors.
+     * @param descriptorIds Class descriptors.
      * @return List of class descriptor network messages.
      */
     @Nullable
-    public List<ClassDescriptorMessage> createClassDescriptorsMessages(Set<ClassDescriptor> descriptors) {
-        List<ClassDescriptorMessage> messages = descriptors.stream()
+    public List<ClassDescriptorMessage> createClassDescriptorsMessages(IntSet descriptorIds) {
+        List<ClassDescriptorMessage> messages = descriptorIds.intStream()
+                .mapToObj(serializationService::getDescriptor)
                 .filter(descriptor -> {
                     int descriptorId = descriptor.descriptorId();
                     return !sentDescriptors.contains(descriptorId) && !shouldBeBuiltIn(descriptorId);
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/BuiltInContainerMarshallers.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/BuiltInContainerMarshallers.java
index 183e80d..1b1fe30 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/BuiltInContainerMarshallers.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/BuiltInContainerMarshallers.java
@@ -21,8 +21,6 @@ import static java.util.Collections.singletonList;
 import static org.apache.ignite.internal.network.serialization.marshal.ProtocolMarshalling.writeLength;
 
 import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.BitSet;
@@ -37,6 +35,8 @@ import java.util.Map;
 import java.util.function.IntFunction;
 import org.apache.ignite.internal.network.serialization.ClassDescriptor;
 import org.apache.ignite.internal.network.serialization.Classes;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
 
 /**
  * Utility to (un)marshal built-in collections and maps.
@@ -78,7 +78,7 @@ class BuiltInContainerMarshallers {
         this.typedReader = typedReader;
     }
 
-    void writeGenericRefArray(Object[] array, ClassDescriptor arrayDescriptor, DataOutputStream output, MarshallingContext context)
+    void writeGenericRefArray(Object[] array, ClassDescriptor arrayDescriptor, IgniteDataOutput output, MarshallingContext context)
             throws IOException, MarshalException {
         Class<?> componentType = array.getClass().getComponentType();
 
@@ -109,7 +109,7 @@ class BuiltInContainerMarshallers {
     }
 
     @SuppressWarnings("unchecked")
-    <T> void fillGenericRefArray(DataInputStream input, T[] array, UnmarshallingContext context)
+    <T> void fillGenericRefArrayFrom(IgniteDataInput input, T[] array, UnmarshallingContext context)
             throws IOException, UnmarshalException {
         if (array.length == 0) {
             return;
@@ -130,7 +130,7 @@ class BuiltInContainerMarshallers {
         }
     }
 
-    void writeBuiltInCollection(Collection<?> object, ClassDescriptor descriptor, DataOutputStream output, MarshallingContext context)
+    void writeBuiltInCollection(Collection<?> object, ClassDescriptor descriptor, IgniteDataOutput output, MarshallingContext context)
             throws IOException, MarshalException {
         if (supportsAsMutableBuiltInCollection(descriptor)) {
             writeCollection(object, descriptor, output, context);
@@ -156,12 +156,12 @@ class BuiltInContainerMarshallers {
     private void writeCollection(
             Collection<?> collection,
             ClassDescriptor collectionDescriptor,
-            DataOutputStream output,
+            IgniteDataOutput output,
             MarshallingContext context
     ) throws IOException, MarshalException {
-        context.addUsedDescriptor(collectionDescriptor);
-
         BuiltInMarshalling.writeCollection(collection, output, untypedWriter(), context);
+
+        context.addUsedDescriptor(collectionDescriptor);
     }
 
     @SuppressWarnings("unchecked")
@@ -169,7 +169,7 @@ class BuiltInContainerMarshallers {
         return (ValueWriter<T>) untypedWriter;
     }
 
-    private void writeSingletonList(List<?> list, ClassDescriptor listDescriptor, DataOutputStream output, MarshallingContext context)
+    private void writeSingletonList(List<?> list, ClassDescriptor listDescriptor, IgniteDataOutput output, MarshallingContext context)
             throws MarshalException, IOException {
         assert list.size() == 1;
 
@@ -219,7 +219,7 @@ class BuiltInContainerMarshallers {
     }
 
     <T, C extends Collection<T>> void fillBuiltInCollectionFrom(
-            DataInputStream input,
+            IgniteDataInput input,
             C collection,
             ClassDescriptor collectionDescriptor,
             ValueReader<T> elementReader,
@@ -234,7 +234,7 @@ class BuiltInContainerMarshallers {
         BuiltInMarshalling.fillCollectionFrom(input, collection, elementReader, context);
     }
 
-    void writeBuiltInMap(Map<?, ?> map, ClassDescriptor mapDescriptor, DataOutputStream output, MarshallingContext context)
+    void writeBuiltInMap(Map<?, ?> map, ClassDescriptor mapDescriptor, IgniteDataOutput output, MarshallingContext context)
             throws MarshalException, IOException {
         if (!supportsAsBuiltInMap(mapDescriptor)) {
             throw new IllegalStateException("Marshalling of " + mapDescriptor.clazz() + " is not supported, but it's marked as a built-in");
@@ -277,7 +277,7 @@ class BuiltInContainerMarshallers {
     }
 
     <K, V, M extends Map<K, V>> void fillBuiltInMapFrom(
-            DataInputStream input,
+            IgniteDataInput input,
             M map,
             ValueReader<K> keyReader,
             ValueReader<V> valueReader,
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/BuiltInMarshalling.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/BuiltInMarshalling.java
index 6b3e798..768ab65 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/BuiltInMarshalling.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/BuiltInMarshalling.java
@@ -17,14 +17,13 @@
 
 package org.apache.ignite.internal.network.serialization.marshal;
 
+import static java.nio.charset.StandardCharsets.ISO_8859_1;
 import static java.util.Collections.singletonList;
 import static org.apache.ignite.internal.network.serialization.marshal.ProtocolMarshalling.readLength;
 import static org.apache.ignite.internal.network.serialization.marshal.ProtocolMarshalling.writeLength;
 
 import java.io.DataInput;
-import java.io.DataInputStream;
 import java.io.DataOutput;
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.lang.reflect.Array;
 import java.lang.reflect.Field;
@@ -32,9 +31,15 @@ import java.math.BigDecimal;
 import java.util.BitSet;
 import java.util.Collection;
 import java.util.Date;
+import java.util.List;
 import java.util.Map;
+import java.util.RandomAccess;
 import java.util.UUID;
 import java.util.function.IntFunction;
+import org.apache.ignite.internal.util.StringIntrospection;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteDataInput.Materializer;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.NotNull;
 
@@ -46,6 +51,8 @@ class BuiltInMarshalling {
     private static final IntFunction<Class<?>[]> classArrayFactory = Class[]::new;
     private static final ValueReader<Class<?>> classReader = BuiltInMarshalling::readClass;
 
+    private static final Materializer<String> LATIN1_MATERIALIZER = (bytes, offset, len) -> new String(bytes, offset, len, ISO_8859_1);
+
     private static final Field singletonListElementField;
 
     static {
@@ -65,6 +72,16 @@ class BuiltInMarshalling {
         return input.readUTF();
     }
 
+    static void writeLatin1String(String string, IgniteDataOutput output) throws IOException {
+        byte[] bytes = StringIntrospection.fastLatin1Bytes(string);
+        writeByteArray(bytes, output);
+    }
+
+    static String readLatin1String(IgniteDataInput input) throws IOException {
+        int length = readLength(input);
+        return input.materializeFromNextBytes(length, LATIN1_MATERIALIZER);
+    }
+
     static Object readBareObject(@SuppressWarnings("unused") DataInput input) {
         return new Object();
     }
@@ -97,128 +114,128 @@ class BuiltInMarshalling {
         return new Date(input.readLong());
     }
 
-    static void writeByteArray(byte[] array, DataOutput output) throws IOException {
+    static void writeByteArray(byte[] array, IgniteDataOutput output) throws IOException {
         writeLength(array.length, output);
-        output.write(array);
+        output.writeByteArray(array);
     }
 
-    static byte[] readByteArray(DataInput input) throws IOException {
+    static byte[] readByteArray(IgniteDataInput input) throws IOException {
         int length = readLength(input);
-        byte[] array = new byte[length];
-        input.readFully(array);
-        return array;
+        return input.readByteArray(length);
     }
 
-    static void writeShortArray(short[] array, DataOutput output) throws IOException {
+    static void writeShortArray(short[] array, IgniteDataOutput output) throws IOException {
         writeLength(array.length, output);
-        for (short sh : array) {
-            output.writeShort(sh);
-        }
+        output.writeShortArray(array);
     }
 
-    static short[] readShortArray(DataInput input) throws IOException {
+    static short[] readShortArray(IgniteDataInput input) throws IOException {
         int length = readLength(input);
-        short[] array = new short[length];
-        for (int i = 0; i < length; i++) {
-            array[i] = input.readShort();
-        }
-        return array;
+        return input.readShortArray(length);
     }
 
-    static void writeIntArray(int[] array, DataOutput output) throws IOException {
+    static void writeIntArray(int[] array, IgniteDataOutput output) throws IOException {
         writeLength(array.length, output);
-        for (int sh : array) {
-            output.writeInt(sh);
-        }
+        output.writeIntArray(array);
     }
 
-    static int[] readIntArray(DataInput input) throws IOException {
+    static int[] readIntArray(IgniteDataInput input) throws IOException {
         int length = readLength(input);
-        int[] array = new int[length];
-        for (int i = 0; i < length; i++) {
-            array[i] = input.readInt();
-        }
-        return array;
+        return input.readIntArray(length);
     }
 
-    static void writeFloatArray(float[] array, DataOutput output) throws IOException {
+    static void writeFloatArray(float[] array, IgniteDataOutput output) throws IOException {
         writeLength(array.length, output);
-        for (float sh : array) {
-            output.writeFloat(sh);
-        }
+        output.writeFloatArray(array);
     }
 
-    static float[] readFloatArray(DataInput input) throws IOException {
+    static float[] readFloatArray(IgniteDataInput input) throws IOException {
         int length = readLength(input);
-        float[] array = new float[length];
-        for (int i = 0; i < length; i++) {
-            array[i] = input.readFloat();
-        }
-        return array;
+        return input.readFloatArray(length);
     }
 
-    static void writeLongArray(long[] array, DataOutput output) throws IOException {
+    static void writeLongArray(long[] array, IgniteDataOutput output) throws IOException {
         writeLength(array.length, output);
-        for (long sh : array) {
-            output.writeLong(sh);
-        }
+        output.writeLongArray(array);
     }
 
-    static long[] readLongArray(DataInput input) throws IOException {
+    static long[] readLongArray(IgniteDataInput input) throws IOException {
         int length = readLength(input);
-        long[] array = new long[length];
-        for (int i = 0; i < length; i++) {
-            array[i] = input.readLong();
-        }
-        return array;
+        return input.readLongArray(length);
     }
 
-    static void writeDoubleArray(double[] array, DataOutput output) throws IOException {
+    static void writeDoubleArray(double[] array, IgniteDataOutput output) throws IOException {
         writeLength(array.length, output);
-        for (double sh : array) {
-            output.writeDouble(sh);
-        }
+        output.writeDoubleArray(array);
     }
 
-    static double[] readDoubleArray(DataInput input) throws IOException {
+    static double[] readDoubleArray(IgniteDataInput input) throws IOException {
         int length = readLength(input);
-        double[] array = new double[length];
-        for (int i = 0; i < length; i++) {
-            array[i] = input.readDouble();
-        }
-        return array;
+        return input.readDoubleArray(length);
     }
 
-    static void writeBooleanArray(boolean[] array, DataOutput output) throws IOException {
+    static void writeBooleanArray(boolean[] array, IgniteDataOutput output) throws IOException {
         writeLength(array.length, output);
-        for (boolean sh : array) {
-            output.writeBoolean(sh);
+
+        byte bits = 0;
+        int writtenBytes = 0;
+        for (int i = 0; i < array.length; i++) {
+            boolean bit = array[i];
+            int bitIndex = i % 8;
+            if (bit) {
+                bits |= (1 << bitIndex);
+            }
+            if (bitIndex == 7) {
+                output.writeByte(bits);
+                writtenBytes++;
+                bits = 0;
+            }
+        }
+
+        int totalBytesToWrite = numberOfBytesToPackBits(array.length);
+        if (writtenBytes < totalBytesToWrite) {
+            output.writeByte(bits);
         }
     }
 
-    static boolean[] readBooleanArray(DataInput input) throws IOException {
+    private static int numberOfBytesToPackBits(int length) {
+        return length / 8 + (length % 8 == 0 ? 0 : 1);
+    }
+
+    static boolean[] readBooleanArray(IgniteDataInput input) throws IOException {
         int length = readLength(input);
+
         boolean[] array = new boolean[length];
-        for (int i = 0; i < length; i++) {
-            array[i] = input.readBoolean();
+
+        int totalBytesToRead = numberOfBytesToPackBits(length);
+
+        for (int byteIndex = 0; byteIndex < totalBytesToRead; byteIndex++) {
+            byte bits = input.readByte();
+
+            int bitsToReadInThisByte;
+            if (byteIndex < totalBytesToRead - 1) {
+                bitsToReadInThisByte = 8;
+            } else {
+                bitsToReadInThisByte = length - (totalBytesToRead - 1) * 8;
+            }
+            for (int bitIndex = 0; bitIndex < bitsToReadInThisByte; bitIndex++) {
+                if ((bits & (1 << bitIndex)) != 0) {
+                    array[byteIndex * 8 + bitIndex] = true;
+                }
+            }
         }
+
         return array;
     }
 
-    static void writeCharArray(char[] array, DataOutput output) throws IOException {
+    static void writeCharArray(char[] array, IgniteDataOutput output) throws IOException {
         writeLength(array.length, output);
-        for (char sh : array) {
-            output.writeChar(sh);
-        }
+        output.writeCharArray(array);
     }
 
-    static char[] readCharArray(DataInput input) throws IOException {
+    static char[] readCharArray(IgniteDataInput input) throws IOException {
         int length = readLength(input);
-        char[] array = new char[length];
-        for (int i = 0; i < length; i++) {
-            array[i] = input.readChar();
-        }
-        return array;
+        return input.readCharArray(length);
     }
 
     static void writeBigDecimal(BigDecimal object, DataOutput output) throws IOException {
@@ -256,16 +273,16 @@ class BuiltInMarshalling {
         return classByName(className, context.classLoader());
     }
 
-    static void writeClassArray(Class<?>[] classes, DataOutputStream output, MarshallingContext context)
+    static void writeClassArray(Class<?>[] classes, IgniteDataOutput output, MarshallingContext context)
             throws IOException, MarshalException {
         writeRefArray(classes, output, classWriter, context);
     }
 
-    static Class<?>[] readClassArray(DataInputStream input, UnmarshallingContext context) throws IOException, UnmarshalException {
+    static Class<?>[] readClassArray(IgniteDataInput input, UnmarshallingContext context) throws IOException, UnmarshalException {
         return readRefArray(input, classArrayFactory, classReader, context);
     }
 
-    private static <T> void writeRefArray(T[] array, DataOutputStream output, ValueWriter<T> valueWriter, MarshallingContext context)
+    private static <T> void writeRefArray(T[] array, IgniteDataOutput output, ValueWriter<T> valueWriter, MarshallingContext context)
             throws IOException, MarshalException {
         writeLength(array.length, output);
         for (T object : array) {
@@ -274,7 +291,7 @@ class BuiltInMarshalling {
     }
 
     private static <T> T[] readRefArray(
-            DataInputStream input,
+            IgniteDataInput input,
             IntFunction<T[]> arrayFactory,
             ValueReader<T> valueReader,
             UnmarshallingContext context
@@ -287,7 +304,7 @@ class BuiltInMarshalling {
         return array;
     }
 
-    private static <T> void fillRefArrayFrom(DataInputStream input, T[] array, ValueReader<T> valueReader, UnmarshallingContext context)
+    private static <T> void fillRefArrayFrom(IgniteDataInput input, T[] array, ValueReader<T> valueReader, UnmarshallingContext context)
             throws IOException, UnmarshalException {
         for (int i = 0; i < array.length; i++) {
             array[i] = valueReader.read(input, context);
@@ -309,19 +326,35 @@ class BuiltInMarshalling {
 
     static <T> void writeCollection(
             Collection<T> collection,
-            DataOutputStream output,
+            IgniteDataOutput output,
             ValueWriter<T> valueWriter,
             MarshallingContext context
     ) throws IOException, MarshalException {
         writeLength(collection.size(), output);
 
-        for (T object : collection) {
-            valueWriter.write(object, output, context);
+        if (collection instanceof List && collection instanceof RandomAccess) {
+            writeRandomAccessListElements(output, valueWriter, context, (List<T>) collection);
+        } else {
+            for (T object : collection) {
+                valueWriter.write(object, output, context);
+            }
+        }
+    }
+
+    private static <T> void writeRandomAccessListElements(
+            IgniteDataOutput output,
+            ValueWriter<T> valueWriter,
+            MarshallingContext context,
+            List<T> list
+    ) throws IOException, MarshalException {
+        //noinspection ForLoopReplaceableByForEach
+        for (int i = 0; i < list.size(); i++) {
+            valueWriter.write(list.get(i), output, context);
         }
     }
 
     static <T, C extends Collection<T>> void fillCollectionFrom(
-            DataInputStream input,
+            IgniteDataInput input,
             C collection,
             ValueReader<T> valueReader,
             UnmarshallingContext context
@@ -339,7 +372,7 @@ class BuiltInMarshalling {
     }
 
     static <T, C extends Collection<T>> void fillSingletonCollectionFrom(
-            DataInputStream input,
+            IgniteDataInput input,
             C collection,
             ValueReader<T> elementReader,
             UnmarshallingContext context
@@ -355,7 +388,7 @@ class BuiltInMarshalling {
 
     static <K, V> void writeMap(
             Map<K, V> map,
-            DataOutputStream output,
+            IgniteDataOutput output,
             ValueWriter<K> keyWriter,
             ValueWriter<V> valueWriter,
             MarshallingContext context
@@ -369,7 +402,7 @@ class BuiltInMarshalling {
     }
 
     static <K, V, M extends Map<K, V>> void fillMapFrom(
-            DataInputStream input,
+            IgniteDataInput input,
             M map,
             ValueReader<K> keyReader,
             ValueReader<V> valueReader,
@@ -387,11 +420,11 @@ class BuiltInMarshalling {
         return mapFactory.apply(length);
     }
 
-    static void writeBitSet(BitSet object, DataOutput output) throws IOException {
+    static void writeBitSet(BitSet object, IgniteDataOutput output) throws IOException {
         writeByteArray(object.toByteArray(), output);
     }
 
-    static BitSet readBitSet(DataInput input) throws IOException {
+    static BitSet readBitSet(IgniteDataInput input) throws IOException {
         return BitSet.valueOf(readByteArray(input));
     }
 
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/BuiltInNonContainerMarshallers.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/BuiltInNonContainerMarshallers.java
index 31f3693..8a6484e 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/BuiltInNonContainerMarshallers.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/BuiltInNonContainerMarshallers.java
@@ -18,9 +18,7 @@
 package org.apache.ignite.internal.network.serialization.marshal;
 
 import java.io.DataInput;
-import java.io.DataInputStream;
 import java.io.DataOutput;
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.util.BitSet;
@@ -30,6 +28,8 @@ import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.internal.network.serialization.ClassDescriptor;
 import org.apache.ignite.internal.network.serialization.Null;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
 import org.apache.ignite.lang.IgniteUuid;
 
 /**
@@ -122,32 +122,43 @@ class BuiltInNonContainerMarshallers {
      * @return {@code true} if we the given descriptor is a built-in we can handle
      */
     boolean supports(ClassDescriptor descriptor) {
-        return descriptor.isEnum() || builtInMarshallers.containsKey(descriptor.clazz());
+        return descriptor.isEnum() || descriptor.isLatin1String()
+                || builtInMarshallers.containsKey(descriptor.clazz());
     }
 
-    void writeBuiltIn(Object object, ClassDescriptor descriptor, DataOutputStream output, MarshallingContext context)
+    void writeBuiltIn(Object object, ClassDescriptor descriptor, IgniteDataOutput output, MarshallingContext context)
             throws IOException, MarshalException {
+        actuallyWrite(object, descriptor, output, context);
+
+        context.addUsedDescriptor(descriptor);
+    }
+
+    private void actuallyWrite(Object object, ClassDescriptor descriptor, IgniteDataOutput output, MarshallingContext context)
+            throws IOException, MarshalException {
+        if (descriptor.isLatin1String()) {
+            BuiltInMarshalling.writeLatin1String((String) object, output);
+            return;
+        }
         if (descriptor.isEnum()) {
-            writeEnum((Enum<?>) object, descriptor, output, context);
+            BuiltInMarshalling.writeEnum((Enum<?>) object, output);
             return;
         }
 
-        BuiltInMarshaller<?> builtInMarshaller = findBuiltInMarshaller(descriptor);
-
-        builtInMarshaller.marshal(object, output, context);
-
-        context.addUsedDescriptor(descriptor);
+        writeWithBuiltInMarshaller(object, descriptor, output, context);
     }
 
-    private void writeEnum(Enum<?> object, ClassDescriptor descriptor, DataOutputStream output, MarshallingContext context)
-            throws IOException {
-        BuiltInMarshalling.writeEnum(object, output);
+    private void writeWithBuiltInMarshaller(Object object, ClassDescriptor descriptor, IgniteDataOutput output, MarshallingContext context)
+            throws IOException, MarshalException {
+        BuiltInMarshaller<?> builtInMarshaller = findBuiltInMarshaller(descriptor);
 
-        context.addUsedDescriptor(descriptor);
+        builtInMarshaller.marshal(object, output, context);
     }
 
-    Object readBuiltIn(ClassDescriptor descriptor, DataInputStream input, UnmarshallingContext context)
+    Object readBuiltIn(ClassDescriptor descriptor, IgniteDataInput input, UnmarshallingContext context)
             throws IOException, UnmarshalException {
+        if (descriptor.isLatin1String()) {
+            return BuiltInMarshalling.readLatin1String(input);
+        }
         if (descriptor.isEnum()) {
             return readEnum(descriptor, input);
         }
@@ -157,7 +168,7 @@ class BuiltInNonContainerMarshallers {
     }
 
     @SuppressWarnings({"unchecked", "rawtypes"})
-    private Object readEnum(ClassDescriptor descriptor, DataInputStream input) throws IOException {
+    private Object readEnum(ClassDescriptor descriptor, DataInput input) throws IOException {
         return BuiltInMarshalling.readEnum(input, (Class<? extends Enum>) descriptor.clazz());
     }
 
@@ -180,11 +191,11 @@ class BuiltInNonContainerMarshallers {
             this.reader = reader;
         }
 
-        private void marshal(Object object, DataOutputStream output, MarshallingContext context) throws IOException, MarshalException {
+        private void marshal(Object object, IgniteDataOutput output, MarshallingContext context) throws IOException, MarshalException {
             writer.write(valueRefClass.cast(object), output, context);
         }
 
-        private Object unmarshal(DataInputStream input, UnmarshallingContext context) throws IOException, UnmarshalException {
+        private Object unmarshal(IgniteDataInput input, UnmarshallingContext context) throws IOException, UnmarshalException {
             return reader.read(input, context);
         }
     }
@@ -198,7 +209,7 @@ class BuiltInNonContainerMarshallers {
          * @throws IOException      if an I/O problem occurs
          * @throws MarshalException if another problem occurs
          */
-        void write(T value, DataOutput output) throws IOException, MarshalException;
+        void write(T value, IgniteDataOutput output) throws IOException, MarshalException;
     }
 
 
@@ -211,6 +222,6 @@ class BuiltInNonContainerMarshallers {
          * @throws IOException          if an I/O problem occurs
          * @throws UnmarshalException   if another problem (like {@link ClassNotFoundException}) occurs
          */
-        T read(DataInput input) throws IOException, UnmarshalException;
+        T read(IgniteDataInput input) throws IOException, UnmarshalException;
     }
 }
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/DefaultFieldsReaderWriter.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/DefaultFieldsReaderWriter.java
index 1e33ed8..be371d0 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/DefaultFieldsReaderWriter.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/DefaultFieldsReaderWriter.java
@@ -18,11 +18,12 @@
 package org.apache.ignite.internal.network.serialization.marshal;
 
 import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
+import java.io.DataOutput;
 import java.io.IOException;
 import java.util.BitSet;
 import org.apache.ignite.internal.network.serialization.ClassDescriptor;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -40,11 +41,11 @@ interface DefaultFieldsReaderWriter {
      * @throws MarshalException if something goes wrong
      * @throws IOException      if I/O fails
      */
-    void defaultWriteFields(Object object, ClassDescriptor descriptor, DataOutputStream output, MarshallingContext context)
+    void defaultWriteFields(Object object, ClassDescriptor descriptor, IgniteDataOutput output, MarshallingContext context)
             throws MarshalException, IOException;
 
     @Nullable
-    BitSet writeNullsBitSet(Object object, ClassDescriptor descriptor, DataOutputStream output) throws IOException;
+    BitSet writeNullsBitSet(Object object, ClassDescriptor descriptor, DataOutput output) throws IOException;
 
     /**
      * Reads object fields from the input stream and stores them in the object using default marshalling.
@@ -56,7 +57,7 @@ interface DefaultFieldsReaderWriter {
      * @throws IOException          if I/O fails
      * @throws UnmarshalException   if something goes wrong
      */
-    void defaultFillFieldsFrom(DataInputStream input, Object object, ClassDescriptor descriptor, UnmarshallingContext context)
+    void defaultFillFieldsFrom(IgniteDataInput input, Object object, ClassDescriptor descriptor, UnmarshallingContext context)
             throws IOException, UnmarshalException;
 
     @Nullable
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/DefaultUserObjectMarshaller.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/DefaultUserObjectMarshaller.java
index e4ec1ed..34d1039 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/DefaultUserObjectMarshaller.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/DefaultUserObjectMarshaller.java
@@ -19,12 +19,8 @@ package org.apache.ignite.internal.network.serialization.marshal;
 
 import static org.apache.ignite.internal.network.serialization.marshal.ObjectClass.objectClass;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.DataInput;
-import java.io.DataInputStream;
 import java.io.DataOutput;
-import java.io.DataOutputStream;
 import java.io.Externalizable;
 import java.io.IOException;
 import java.io.InputStream;
@@ -37,6 +33,9 @@ import org.apache.ignite.internal.network.serialization.ClassDescriptorFactory;
 import org.apache.ignite.internal.network.serialization.ClassDescriptorRegistry;
 import org.apache.ignite.internal.network.serialization.DescriptorRegistry;
 import org.apache.ignite.internal.network.serialization.SpecialMethodInvocationException;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
+import org.apache.ignite.internal.util.io.IgniteUnsafeDataInput;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -59,8 +58,16 @@ public class DefaultUserObjectMarshaller implements UserObjectMarshaller {
     private final ExternalizableMarshaller externalizableMarshaller;
     private final ProxyMarshaller proxyMarshaller;
 
+    private final MarshallingValidations validations = new MarshallingValidations();
+
     private final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
 
+    private final ThreadLocal<UosIgniteOutputStream> threadLocalDataOutput = ThreadLocal.withInitial(this::newOutput);
+
+    private UosIgniteOutputStream newOutput() {
+        return new UosIgniteOutputStream(4096);
+    }
+
     /**
      * Constructor.
      *
@@ -94,17 +101,34 @@ public class DefaultUserObjectMarshaller implements UserObjectMarshaller {
     public MarshalledObject marshal(@Nullable Object object) throws MarshalException {
         MarshallingContext context = new MarshallingContext();
 
-        var baos = new ByteArrayOutputStream();
-        try (var dos = new DataOutputStream(baos)) {
-            marshalShared(object, dos, context);
+        UosIgniteOutputStream output = freshByteArrayOutputStream();
+        try {
+            marshalShared(object, output, context);
         } catch (IOException e) {
             throw new MarshalException("Cannot marshal", e);
+        } finally {
+            output.release();
         }
 
-        return new MarshalledObject(baos.toByteArray(), context.usedDescriptors());
+        return new MarshalledObject(output.array(), context.usedDescriptorIds());
     }
 
-    private void marshalShared(@Nullable Object object, DataOutputStream output, MarshallingContext context)
+    private UosIgniteOutputStream freshByteArrayOutputStream() {
+        UosIgniteOutputStream output = threadLocalDataOutput.get();
+
+        if (output.isOccupied()) {
+            // This is a nested invocation, probably from a callback method like writeObject(), we can't reuse
+            // the 'outer' output, let's make a new one as this should not happen often.
+            output = newOutput();
+        } else {
+            output.cleanup();
+        }
+        output.occupy();
+
+        return output;
+    }
+
+    private void marshalShared(@Nullable Object object, IgniteDataOutput output, MarshallingContext context)
             throws MarshalException, IOException {
         marshalShared(object, NO_DECLARED_CLASS, output, context);
     }
@@ -112,13 +136,13 @@ public class DefaultUserObjectMarshaller implements UserObjectMarshaller {
     private void marshalShared(
             @Nullable Object object,
             @Nullable Class<?> declaredClass,
-            DataOutputStream output,
+            IgniteDataOutput output,
             MarshallingContext context
     ) throws MarshalException, IOException {
         marshalToOutput(object, declaredClass, output, context, NOT_UNSHARED);
     }
 
-    private void marshalUnshared(@Nullable Object object, Class<?> declaredClass, DataOutputStream output, MarshallingContext context)
+    private void marshalUnshared(@Nullable Object object, Class<?> declaredClass, IgniteDataOutput output, MarshallingContext context)
             throws MarshalException, IOException {
         marshalToOutput(object, declaredClass, output, context, UNSHARED);
     }
@@ -126,7 +150,7 @@ public class DefaultUserObjectMarshaller implements UserObjectMarshaller {
     private void marshalToOutput(
             @Nullable Object object,
             @Nullable Class<?> declaredClass,
-            DataOutputStream output,
+            IgniteDataOutput output,
             MarshallingContext context,
             boolean unshared
     ) throws MarshalException, IOException {
@@ -137,7 +161,7 @@ public class DefaultUserObjectMarshaller implements UserObjectMarshaller {
                 || declaredClass.isAssignableFrom(object.getClass())
                 : "Object " + object + " is expected to be an instance of subclass of " + declaredClass + ", but it's " + object.getClass();
 
-        MarshallingValidations.throwIfMarshallingNotSupported(object);
+        validations.throwIfMarshallingNotSupported(object);
 
         ClassDescriptor originalDescriptor = localDescriptors.getOrCreateDescriptor(object, declaredClass);
 
@@ -207,7 +231,7 @@ public class DefaultUserObjectMarshaller implements UserObjectMarshaller {
             ClassDescriptor descriptor,
             @Nullable Class<?> declaredClass,
             int objectId,
-            DataOutputStream output,
+            IgniteDataOutput output,
             MarshallingContext context
     ) throws IOException, MarshalException {
         if (!runtimeTypeIsKnownUpfront(declaredClass)) {
@@ -235,7 +259,7 @@ public class DefaultUserObjectMarshaller implements UserObjectMarshaller {
             Object object,
             ClassDescriptor descriptor,
             Class<?> declaredClass,
-            DataOutputStream output,
+            IgniteDataOutput output,
             MarshallingContext context
     ) throws IOException, MarshalException {
         if (!runtimeTypeIsKnownUpfront(declaredClass)) {
@@ -245,7 +269,7 @@ public class DefaultUserObjectMarshaller implements UserObjectMarshaller {
         writeObject(object, descriptor, output, context);
     }
 
-    private void writeObject(@Nullable Object object, ClassDescriptor descriptor, DataOutputStream output, MarshallingContext context)
+    private void writeObject(@Nullable Object object, ClassDescriptor descriptor, IgniteDataOutput output, MarshallingContext context)
             throws IOException, MarshalException {
         if (isBuiltInNonContainer(descriptor)) {
             builtInNonContainerMarshallers.writeBuiltIn(object, descriptor, output, context);
@@ -286,12 +310,13 @@ public class DefaultUserObjectMarshaller implements UserObjectMarshaller {
     @Override
     @Nullable
     public <T> T unmarshal(byte[] bytes, DescriptorRegistry mergedDescriptors) throws UnmarshalException {
-        var bais = new ByteArrayInputStream(bytes);
-        try (var dis = new DataInputStream(bais)) {
-            UnmarshallingContext context = new UnmarshallingContext(bais, mergedDescriptors, classLoader);
-            T result = unmarshalShared(dis, context);
+        var input = new IgniteUnsafeDataInput(bytes);
+
+        try {
+            UnmarshallingContext context = new UnmarshallingContext(input, mergedDescriptors, classLoader);
+            T result = unmarshalShared(input, context);
 
-            throwIfNotDrained(dis);
+            throwIfNotDrained(input);
 
             return result;
         } catch (IOException e) {
@@ -299,22 +324,22 @@ public class DefaultUserObjectMarshaller implements UserObjectMarshaller {
         }
     }
 
-    private <T> T unmarshalShared(DataInputStream input, UnmarshallingContext context) throws IOException, UnmarshalException {
+    private <T> T unmarshalShared(IgniteDataInput input, UnmarshallingContext context) throws IOException, UnmarshalException {
         return unmarshalShared(input, NO_DECLARED_CLASS, context);
     }
 
-    private <T> T unmarshalShared(DataInputStream input, @Nullable Class<?> declaredClass, UnmarshallingContext context)
+    private <T> T unmarshalShared(IgniteDataInput input, @Nullable Class<?> declaredClass, UnmarshallingContext context)
             throws IOException, UnmarshalException {
         return unmarshalFromInput(input, declaredClass, context, NOT_UNSHARED);
     }
 
-    private <T> T unmarshalUnshared(DataInputStream input, @Nullable Class<?> declaredClass, UnmarshallingContext context)
+    private <T> T unmarshalUnshared(IgniteDataInput input, @Nullable Class<?> declaredClass, UnmarshallingContext context)
             throws IOException, UnmarshalException {
         return unmarshalFromInput(input, declaredClass, context, UNSHARED);
     }
 
     private <T> T unmarshalFromInput(
-            DataInputStream input,
+            IgniteDataInput input,
             @Nullable Class<?> declaredClass,
             UnmarshallingContext context,
             boolean unshared
@@ -335,7 +360,7 @@ public class DefaultUserObjectMarshaller implements UserObjectMarshaller {
         return resolvedObject;
     }
 
-    private ClassDescriptor resolveDescriptor(DataInputStream input, @Nullable Class<?> declaredClass, UnmarshallingContext context)
+    private ClassDescriptor resolveDescriptor(IgniteDataInput input, @Nullable Class<?> declaredClass, UnmarshallingContext context)
             throws UnmarshalException, IOException {
         if (runtimeTypeIsKnownUpfront(declaredClass)) {
             return context.resolveDescriptorOfDeclaredClass(declaredClass);
@@ -345,7 +370,7 @@ public class DefaultUserObjectMarshaller implements UserObjectMarshaller {
         }
     }
 
-    private int peekObjectId(DataInputStream input, UnmarshallingContext context) throws IOException {
+    private int peekObjectId(DataInput input, UnmarshallingContext context) throws IOException {
         context.markSource(ProtocolMarshalling.MAX_LENGTH_BYTE_COUNT);
         int objectId = ProtocolMarshalling.readObjectId(input);
         context.resetSourceToMark();
@@ -367,7 +392,7 @@ public class DefaultUserObjectMarshaller implements UserObjectMarshaller {
     }
 
     @Nullable
-    private Object readObject(DataInputStream input, UnmarshallingContext context, ClassDescriptor descriptor, boolean unshared)
+    private Object readObject(IgniteDataInput input, UnmarshallingContext context, ClassDescriptor descriptor, boolean unshared)
             throws IOException, UnmarshalException {
         if (!mayHaveObjectIdentity(descriptor)) {
             return readValue(input, descriptor, context);
@@ -384,7 +409,7 @@ public class DefaultUserObjectMarshaller implements UserObjectMarshaller {
 
     @Nullable
     private Object readIdentifiableInOneStage(
-            DataInputStream input,
+            IgniteDataInput input,
             ClassDescriptor descriptor,
             UnmarshallingContext context,
             boolean unshared
@@ -397,12 +422,12 @@ public class DefaultUserObjectMarshaller implements UserObjectMarshaller {
         return object;
     }
 
-    private int readObjectId(DataInputStream input) throws IOException {
+    private int readObjectId(DataInput input) throws IOException {
         return ProtocolMarshalling.readObjectId(input);
     }
 
     private Object readIdentifiableInTwoStages(
-            DataInputStream input,
+            IgniteDataInput input,
             ClassDescriptor descriptor,
             UnmarshallingContext context,
             boolean unshared
@@ -417,7 +442,7 @@ public class DefaultUserObjectMarshaller implements UserObjectMarshaller {
         return preInstantiatedObject;
     }
 
-    private Object preInstantiate(ClassDescriptor descriptor, DataInputStream input, UnmarshallingContext context)
+    private Object preInstantiate(ClassDescriptor descriptor, IgniteDataInput input, UnmarshallingContext context)
             throws IOException, UnmarshalException {
         if (isBuiltInNonContainer(descriptor)) {
             throw new IllegalStateException("Should not be here, descriptor is " + descriptor);
@@ -436,7 +461,7 @@ public class DefaultUserObjectMarshaller implements UserObjectMarshaller {
         }
     }
 
-    private void fillObjectFrom(DataInputStream input, Object objectToFill, ClassDescriptor descriptor, UnmarshallingContext context)
+    private void fillObjectFrom(IgniteDataInput input, Object objectToFill, ClassDescriptor descriptor, UnmarshallingContext context)
             throws UnmarshalException, IOException {
         if (isBuiltInNonContainer(descriptor)) {
             throw new IllegalStateException("Cannot fill " + descriptor.clazz() + ", this is a programmatic error");
@@ -456,7 +481,7 @@ public class DefaultUserObjectMarshaller implements UserObjectMarshaller {
     }
 
     private void fillBuiltInCollectionFrom(
-            DataInputStream input,
+            IgniteDataInput input,
             Collection<?> collectionToFill,
             ClassDescriptor descriptor,
             UnmarshallingContext context
@@ -464,18 +489,18 @@ public class DefaultUserObjectMarshaller implements UserObjectMarshaller {
         builtInContainerMarshallers.fillBuiltInCollectionFrom(input, collectionToFill, descriptor, this::unmarshalShared, context);
     }
 
-    private void fillBuiltInMapFrom(DataInputStream input, Map<?, ?> mapToFill, UnmarshallingContext context)
+    private void fillBuiltInMapFrom(IgniteDataInput input, Map<?, ?> mapToFill, UnmarshallingContext context)
             throws UnmarshalException, IOException {
         builtInContainerMarshallers.fillBuiltInMapFrom(input, mapToFill, this::unmarshalShared, this::unmarshalShared, context);
     }
 
-    private void fillGenericRefArrayFrom(DataInputStream input, Object[] array, UnmarshallingContext context)
+    private void fillGenericRefArrayFrom(IgniteDataInput input, Object[] array, UnmarshallingContext context)
             throws IOException, UnmarshalException {
-        builtInContainerMarshallers.fillGenericRefArray(input, array, context);
+        builtInContainerMarshallers.fillGenericRefArrayFrom(input, array, context);
     }
 
     @Nullable
-    private Object readValue(DataInputStream input, ClassDescriptor descriptor, UnmarshallingContext context)
+    private Object readValue(IgniteDataInput input, ClassDescriptor descriptor, UnmarshallingContext context)
             throws IOException, UnmarshalException {
         if (isBuiltInNonContainer(descriptor)) {
             return builtInNonContainerMarshallers.readBuiltIn(descriptor, input, context);
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/ExternalizableMarshaller.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/ExternalizableMarshaller.java
index fa6b753..eb9d5c7 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/ExternalizableMarshaller.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/ExternalizableMarshaller.java
@@ -17,11 +17,11 @@
 
 package org.apache.ignite.internal.network.serialization.marshal;
 
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.Externalizable;
 import java.io.IOException;
 import org.apache.ignite.internal.network.serialization.ClassDescriptor;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
 
 /**
  * (Um)marshalling specific to EXTERNALIZABLE serialization type.
@@ -49,14 +49,14 @@ class ExternalizableMarshaller {
         this.defaultFieldsReaderWriter = defaultFieldsReaderWriter;
     }
 
-    void writeExternalizable(Externalizable externalizable, ClassDescriptor descriptor, DataOutputStream output, MarshallingContext context)
+    void writeExternalizable(Externalizable externalizable, ClassDescriptor descriptor, IgniteDataOutput output, MarshallingContext context)
             throws IOException {
         externalizeTo(externalizable, output, context);
 
         context.addUsedDescriptor(descriptor);
     }
 
-    private void externalizeTo(Externalizable externalizable, DataOutputStream output, MarshallingContext context)
+    private void externalizeTo(Externalizable externalizable, IgniteDataOutput output, MarshallingContext context)
             throws IOException {
         // Do not close the stream yet!
         UosObjectOutputStream oos = context.objectOutputStream(output, valueWriter, unsharedWriter, defaultFieldsReaderWriter);
@@ -81,7 +81,7 @@ class ExternalizableMarshaller {
         }
     }
 
-    <T extends Externalizable> void fillExternalizableFrom(DataInputStream input, T object, UnmarshallingContext context)
+    <T extends Externalizable> void fillExternalizableFrom(IgniteDataInput input, T object, UnmarshallingContext context)
             throws IOException, UnmarshalException {
         // Do not close the stream yet!
         UosObjectInputStream ois = context.objectInputStream(input, valueReader, unsharedReader, defaultFieldsReaderWriter);
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/Bits.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/LittleEndianBits.java
similarity index 60%
rename from modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/Bits.java
rename to modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/LittleEndianBits.java
index ae5085c..d64a8c7 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/Bits.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/LittleEndianBits.java
@@ -19,27 +19,28 @@ package org.apache.ignite.internal.network.serialization.marshal;
 
 /**
  * Code for packing/unpacking primitive values to/from a byte array at specific offsets.
+ * Little endian byte order is used.
  */
-class Bits {
+class LittleEndianBits {
     static boolean getBoolean(byte[] b, int off) {
         return b[off] != 0;
     }
 
     static char getChar(byte[] b, int off) {
-        return (char) ((b[off + 1] & 0xFF)
-                + (b[off] << 8));
+        return (char) ((b[off] & 0xFF)
+                + (b[off + 1] << 8));
     }
 
     static short getShort(byte[] b, int off) {
-        return (short) ((b[off + 1] & 0xFF)
-                + (b[off] << 8));
+        return (short) ((b[off] & 0xFF)
+                + (b[off + 1] << 8));
     }
 
     static int getInt(byte[] b, int off) {
-        return ((b[off + 3] & 0xFF))
-                + ((b[off + 2] & 0xFF) << 8)
-                + ((b[off + 1] & 0xFF) << 16)
-                + ((b[off]) << 24);
+        return ((b[off] & 0xFF))
+                + ((b[off + 1] & 0xFF) << 8)
+                + ((b[off + 2] & 0xFF) << 16)
+                + ((b[off + 3]) << 24);
     }
 
     static float getFloat(byte[] b, int off) {
@@ -47,14 +48,14 @@ class Bits {
     }
 
     static long getLong(byte[] b, int off) {
-        return ((b[off + 7] & 0xFFL))
-                + ((b[off + 6] & 0xFFL) << 8)
-                + ((b[off + 5] & 0xFFL) << 16)
-                + ((b[off + 4] & 0xFFL) << 24)
-                + ((b[off + 3] & 0xFFL) << 32)
-                + ((b[off + 2] & 0xFFL) << 40)
-                + ((b[off + 1] & 0xFFL) << 48)
-                + (((long) b[off]) << 56);
+        return ((b[off] & 0xFFL))
+                + ((b[off + 1] & 0xFFL) << 8)
+                + ((b[off + 2] & 0xFFL) << 16)
+                + ((b[off + 3] & 0xFFL) << 24)
+                + ((b[off + 4] & 0xFFL) << 32)
+                + ((b[off + 5] & 0xFFL) << 40)
+                + ((b[off + 6] & 0xFFL) << 48)
+                + (((long) b[off + 7]) << 56);
     }
 
     static double getDouble(byte[] b, int off) {
@@ -66,20 +67,20 @@ class Bits {
     }
 
     static void putChar(byte[] b, int off, char val) {
-        b[off + 1] = (byte) (val);
-        b[off] = (byte) (val >>> 8);
+        b[off] = (byte) (val);
+        b[off + 1] = (byte) (val >>> 8);
     }
 
     static void putShort(byte[] b, int off, short val) {
-        b[off + 1] = (byte) (val);
-        b[off] = (byte) (val >>> 8);
+        b[off] = (byte) (val);
+        b[off + 1] = (byte) (val >>> 8);
     }
 
     static void putInt(byte[] b, int off, int val) {
-        b[off + 3] = (byte) (val);
-        b[off + 2] = (byte) (val >>> 8);
-        b[off + 1] = (byte) (val >>> 16);
-        b[off] = (byte) (val >>> 24);
+        b[off] = (byte) (val);
+        b[off + 1] = (byte) (val >>> 8);
+        b[off + 2] = (byte) (val >>> 16);
+        b[off + 3] = (byte) (val >>> 24);
     }
 
     static void putFloat(byte[] b, int off, float val) {
@@ -87,14 +88,14 @@ class Bits {
     }
 
     static void putLong(byte[] b, int off, long val) {
-        b[off + 7] = (byte) (val);
-        b[off + 6] = (byte) (val >>> 8);
-        b[off + 5] = (byte) (val >>> 16);
-        b[off + 4] = (byte) (val >>> 24);
-        b[off + 3] = (byte) (val >>> 32);
-        b[off + 2] = (byte) (val >>> 40);
-        b[off + 1] = (byte) (val >>> 48);
-        b[off] = (byte) (val >>> 56);
+        b[off] = (byte) (val);
+        b[off + 1] = (byte) (val >>> 8);
+        b[off + 2] = (byte) (val >>> 16);
+        b[off + 3] = (byte) (val >>> 24);
+        b[off + 4] = (byte) (val >>> 32);
+        b[off + 5] = (byte) (val >>> 40);
+        b[off + 6] = (byte) (val >>> 48);
+        b[off + 7] = (byte) (val >>> 56);
     }
 
     static void putDouble(byte[] b, int off, double val) {
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/LocalDescriptors.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/LocalDescriptors.java
index 8da1e46..6257637 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/LocalDescriptors.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/LocalDescriptors.java
@@ -17,9 +17,11 @@
 
 package org.apache.ignite.internal.network.serialization.marshal;
 
+import org.apache.ignite.internal.network.serialization.BuiltInType;
 import org.apache.ignite.internal.network.serialization.ClassDescriptor;
 import org.apache.ignite.internal.network.serialization.ClassDescriptorFactory;
 import org.apache.ignite.internal.network.serialization.ClassDescriptorRegistry;
+import org.apache.ignite.internal.util.StringIntrospection;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -38,6 +40,9 @@ class LocalDescriptors {
         if (object == null) {
             return localRegistry.getNullDescriptor();
         }
+        if (object instanceof String && StringIntrospection.supportsFastGetLatin1Bytes((String) object)) {
+            return localRegistry.getBuiltInDescriptor(BuiltInType.STRING_LATIN1);
+        }
 
         // For primitives, we need to keep the declaredClass (it differs from object.getClass()).
         Class<?> classToQueryForOriginalDescriptor = declaredClass != null && declaredClass.isPrimitive()
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/MarshalledObject.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/MarshalledObject.java
index 761d5e3..70d1396 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/MarshalledObject.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/MarshalledObject.java
@@ -17,10 +17,8 @@
 
 package org.apache.ignite.internal.network.serialization.marshal;
 
-import java.util.Arrays;
+import it.unimi.dsi.fastutil.ints.IntSet;
 import java.util.Objects;
-import java.util.Set;
-import org.apache.ignite.internal.network.serialization.ClassDescriptor;
 
 /**
  * Represents a marshalled object: the marshalled representation with information about how it was marshalled
@@ -30,21 +28,21 @@ public class MarshalledObject {
     /** Marshalled object representation. */
     private final byte[] bytes;
 
-    /** The descriptors that were used while marshalling the object. */
-    private final Set<ClassDescriptor> usedDescriptors;
+    /** IDs of the descriptors that were used while marshalling the object. */
+    private final IntSet usedDescriptorIds;
 
     /**
      * Creates a new {@link MarshalledObject}.
      *
      * @param bytes           marshalled representation bytes
-     * @param usedDescriptors the descriptors that were used to marshal the object
+     * @param usedDescriptorIds the descriptors that were used to marshal the object
      */
-    public MarshalledObject(byte[] bytes, Set<ClassDescriptor> usedDescriptors) {
+    public MarshalledObject(byte[] bytes, IntSet usedDescriptorIds) {
         Objects.requireNonNull(bytes, "bytes is null");
-        Objects.requireNonNull(usedDescriptors, "usedDescriptors is null");
+        Objects.requireNonNull(usedDescriptorIds, "usedDescriptorIds is null");
 
-        this.bytes = Arrays.copyOf(bytes, bytes.length);
-        this.usedDescriptors = Set.copyOf(usedDescriptors);
+        this.bytes = bytes;
+        this.usedDescriptorIds = usedDescriptorIds;
     }
 
     /**
@@ -53,15 +51,15 @@ public class MarshalledObject {
      * @return marshalled object representation
      */
     public byte[] bytes() {
-        return Arrays.copyOf(bytes, bytes.length);
+        return bytes;
     }
 
     /**
-     * Returns the descriptors that were used while marshalling the object.
+     * Returns IDs of the descriptors that were used while marshalling the object.
      *
-     * @return the descriptors that were used while marshalling the object
+     * @return IDs of the descriptors that were used while marshalling the object
      */
-    public Set<ClassDescriptor> usedDescriptors() {
-        return usedDescriptors;
+    public IntSet usedDescriptorIds() {
+        return usedDescriptorIds;
     }
 }
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/MarshallingContext.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/MarshallingContext.java
index 4f90487..57b3c53 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/MarshallingContext.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/MarshallingContext.java
@@ -17,25 +17,24 @@
 
 package org.apache.ignite.internal.network.serialization.marshal;
 
-import static java.util.Collections.unmodifiableSet;
-
-import java.io.DataOutputStream;
+import it.unimi.dsi.fastutil.Hash;
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
+import it.unimi.dsi.fastutil.ints.IntSet;
+import it.unimi.dsi.fastutil.objects.Object2IntMap;
+import it.unimi.dsi.fastutil.objects.Object2IntOpenCustomHashMap;
 import java.io.IOException;
 import java.io.NotActiveException;
-import java.util.HashSet;
-import java.util.IdentityHashMap;
-import java.util.Map;
 import java.util.Objects;
-import java.util.Set;
 import org.apache.ignite.internal.network.serialization.ClassDescriptor;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
 
 /**
  * Context using during marshalling of an object graph accessible from a root object.
  */
 class MarshallingContext {
-    private final Set<ClassDescriptor> usedDescriptors = new HashSet<>();
+    private final IntSet usedDescriptorIds = new IntOpenHashSet();
 
-    private final Map<Object, Integer> objectsToIds = new IdentityHashMap<>();
+    private final Object2IntMap<Object> objectsToIds = new Object2IntOpenCustomHashMap<>(new IdentityHashStrategy());
 
     private int nextObjectId = 0;
 
@@ -45,11 +44,11 @@ class MarshallingContext {
     private UosObjectOutputStream objectOutputStream;
 
     public void addUsedDescriptor(ClassDescriptor descriptor) {
-        usedDescriptors.add(descriptor);
+        usedDescriptorIds.add(descriptor.descriptorId());
     }
 
-    public Set<ClassDescriptor> usedDescriptors() {
-        return unmodifiableSet(usedDescriptors);
+    public IntSet usedDescriptorIds() {
+        return usedDescriptorIds;
     }
 
     /**
@@ -71,9 +70,8 @@ class MarshallingContext {
             return FlaggedObjectIds.freshObjectId(newId);
         }
 
-        Integer prevId = objectsToIds.get(object);
-        if (prevId != null) {
-            return FlaggedObjectIds.alreadySeenObjectId(prevId);
+        if (objectsToIds.containsKey(object)) {
+            return FlaggedObjectIds.alreadySeenObjectId(objectsToIds.getInt(object));
         } else {
             int newId = nextId();
 
@@ -114,7 +112,7 @@ class MarshallingContext {
     }
 
     UosObjectOutputStream objectOutputStream(
-            DataOutputStream output,
+            IgniteDataOutput output,
             TypedValueWriter valueWriter,
             TypedValueWriter unsharedWriter,
             DefaultFieldsReaderWriter defaultFieldsReaderWriter
@@ -125,4 +123,18 @@ class MarshallingContext {
 
         return objectOutputStream;
     }
+
+    private static class IdentityHashStrategy implements Hash.Strategy<Object> {
+        /** {@inheritDoc} */
+        @Override
+        public int hashCode(Object o) {
+            return System.identityHashCode(o);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public boolean equals(Object a, Object b) {
+            return a == b;
+        }
+    }
 }
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/MarshallingValidations.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/MarshallingValidations.java
index 5d736f8..80a0e21 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/MarshallingValidations.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/MarshallingValidations.java
@@ -19,6 +19,9 @@ package org.apache.ignite.internal.network.serialization.marshal;
 
 import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
+import java.util.Map;
+import java.util.WeakHashMap;
+import java.util.function.Function;
 import org.apache.ignite.internal.network.serialization.Classes;
 import org.jetbrains.annotations.Nullable;
 
@@ -26,32 +29,58 @@ import org.jetbrains.annotations.Nullable;
  * Validations that are run before marshalling objects.
  */
 class MarshallingValidations {
-    static void throwIfMarshallingNotSupported(@Nullable Object object) {
+    private final Map<Class<?>, Marshallability> marshallabilityMap = new WeakHashMap<>();
+
+    private final Function<Class<?>, Marshallability> marshallabilityFunction = this::marshallability;
+
+    void throwIfMarshallingNotSupported(@Nullable Object object) {
         if (object == null) {
             return;
         }
-        if (Enum.class.isAssignableFrom(object.getClass())) {
-            return;
-        }
 
         Class<?> objectClass = object.getClass();
+
+        Marshallability marshallability = marshallabilityMap.computeIfAbsent(objectClass, marshallabilityFunction);
+
+        switch (marshallability) {
+            case INNER_CLASS:
+                throw new MarshallingNotSupportedException("Non-static inner class instances are not supported for marshalling: "
+                        + objectClass);
+            case CAPTUREING_CLOSURE:
+                throw new MarshallingNotSupportedException("Capturing nested class instances are not supported for marshalling: " + object);
+            case NON_SERIALIZABLE_LAMBDA:
+                throw new MarshallingNotSupportedException("Non-serializable lambda instances are not supported for marshalling: "
+                        + object);
+            default:
+                // do nothing
+        }
+    }
+
+    private Marshallability marshallability(Class<?> objectClass) {
+        if (Enum.class.isAssignableFrom(objectClass)) {
+            return Marshallability.OK;
+        }
+
         if (isInnerClass(objectClass)) {
-            throw new MarshallingNotSupportedException("Non-static inner class instances are not supported for marshalling: "
-                    + objectClass);
+            return Marshallability.INNER_CLASS;
         }
+
         if (isCapturingClosure(objectClass)) {
-            throw new MarshallingNotSupportedException("Capturing nested class instances are not supported for marshalling: " + object);
+            return Marshallability.CAPTUREING_CLOSURE;
         }
-        if (Classes.isLambda(objectClass) && !Classes.isSerializable(objectClass)) {
-            throw new MarshallingNotSupportedException("Non-serializable lambda instances are not supported for marshalling: " + object);
+
+        if (isNonSerializableLambda(objectClass)) {
+            return Marshallability.NON_SERIALIZABLE_LAMBDA;
         }
+
+        return Marshallability.OK;
     }
 
-    private static boolean isInnerClass(Class<?> objectClass) {
+    private boolean isInnerClass(Class<?> objectClass) {
         return objectClass.getDeclaringClass() != null && !Modifier.isStatic(objectClass.getModifiers());
     }
 
-    private static boolean isCapturingClosure(Class<?> objectClass) {
+    private boolean isCapturingClosure(Class<?> objectClass) {
         for (Field field : objectClass.getDeclaredFields()) {
             if ((field.isSynthetic() && field.getName().equals("this$0"))
                     || field.getName().startsWith("arg$")) {
@@ -62,6 +91,14 @@ class MarshallingValidations {
         return false;
     }
 
-    private MarshallingValidations() {
+    private boolean isNonSerializableLambda(Class<?> objectClass) {
+        return Classes.isLambda(objectClass) && !Classes.isSerializable(objectClass);
+    }
+
+    private enum Marshallability {
+        OK,
+        INNER_CLASS,
+        CAPTUREING_CLOSURE,
+        NON_SERIALIZABLE_LAMBDA
     }
 }
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/ProtocolMarshalling.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/ProtocolMarshalling.java
index 7dcf45a..3ee474f 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/ProtocolMarshalling.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/ProtocolMarshalling.java
@@ -21,6 +21,7 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.BitSet;
+import org.apache.ignite.internal.util.io.VarInts;
 
 /**
  * Protocol-wide elements marshalling.
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/ProxyMarshaller.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/ProxyMarshaller.java
index dfb8e7e..4ca724f 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/ProxyMarshaller.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/ProxyMarshaller.java
@@ -17,13 +17,13 @@
 
 package org.apache.ignite.internal.network.serialization.marshal;
 
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
 
 /**
  * (Un)marshalling logic specific to {@link Proxy} instances.
@@ -61,7 +61,7 @@ class ProxyMarshaller {
         return Proxy.isProxyClass(classToCheck);
     }
 
-    void writeProxy(Object proxy, DataOutputStream output, MarshallingContext context) throws MarshalException, IOException {
+    void writeProxy(Object proxy, IgniteDataOutput output, MarshallingContext context) throws MarshalException, IOException {
         assert Proxy.isProxyClass(proxy.getClass());
 
         BuiltInMarshalling.writeClassArray(proxy.getClass().getInterfaces(), output, context);
@@ -69,18 +69,18 @@ class ProxyMarshaller {
         valueWriter.write(Proxy.getInvocationHandler(proxy), output, context);
     }
 
-    Object preInstantiateProxy(DataInputStream input, UnmarshallingContext context) throws UnmarshalException, IOException {
+    Object preInstantiateProxy(IgniteDataInput input, UnmarshallingContext context) throws UnmarshalException, IOException {
         Class<?>[] interfaces = BuiltInMarshalling.readClassArray(input, context);
 
         return Proxy.newProxyInstance(context.classLoader(), interfaces, placeholderInvocationHandler);
     }
 
-    void fillProxyFrom(DataInputStream input, Object proxyToFill, UnmarshallingContext context) throws UnmarshalException, IOException {
+    void fillProxyFrom(IgniteDataInput input, Object proxyToFill, UnmarshallingContext context) throws UnmarshalException, IOException {
         InvocationHandler invocationHandler = readInvocationHandler(input, context);
         replaceInvocationHandler(proxyToFill, invocationHandler);
     }
 
-    private InvocationHandler readInvocationHandler(DataInputStream input, UnmarshallingContext context)
+    private InvocationHandler readInvocationHandler(IgniteDataInput input, UnmarshallingContext context)
             throws IOException, UnmarshalException {
         Object object = valueReader.read(input, context);
         if (!(object instanceof InvocationHandler)) {
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/StructuredObjectMarshaller.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/StructuredObjectMarshaller.java
index 18ffa33..11ff853 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/StructuredObjectMarshaller.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/StructuredObjectMarshaller.java
@@ -18,12 +18,9 @@
 package org.apache.ignite.internal.network.serialization.marshal;
 
 import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
+import java.io.DataOutput;
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.BitSet;
-import java.util.Collections;
 import java.util.List;
 import org.apache.ignite.internal.network.serialization.BuiltInTypeIds;
 import org.apache.ignite.internal.network.serialization.ClassDescriptor;
@@ -33,6 +30,8 @@ import org.apache.ignite.internal.network.serialization.IdIndexedDescriptors;
 import org.apache.ignite.internal.network.serialization.SpecialMethodInvocationException;
 import org.apache.ignite.internal.network.serialization.marshal.UosObjectInputStream.UosGetField;
 import org.apache.ignite.internal.network.serialization.marshal.UosObjectOutputStream.UosPutField;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -68,34 +67,17 @@ class StructuredObjectMarshaller implements DefaultFieldsReaderWriter {
         );
     }
 
-    void writeStructuredObject(Object object, ClassDescriptor descriptor, DataOutputStream output, MarshallingContext context)
+    void writeStructuredObject(Object object, ClassDescriptor descriptor, IgniteDataOutput output, MarshallingContext context)
             throws MarshalException, IOException {
-        for (ClassDescriptor layer : lineage(descriptor)) {
+        List<ClassDescriptor> lineage = descriptor.lineage();
+
+        // using a for loop to avoid allocation of an iterator
+        for (ClassDescriptor layer : lineage) {
             writeStructuredObjectLayer(object, layer, output, context);
         }
     }
 
-    /**
-     * Returns the lineage (all the ancestors, from the progenitor (excluding Object) down the line, including the given class).
-     *
-     * @param descriptor class from which to obtain lineage
-     * @return ancestors from the progenitor (excluding Object) down the line, plus the given class itself
-     */
-    private List<ClassDescriptor> lineage(ClassDescriptor descriptor) {
-        List<ClassDescriptor> descriptors = new ArrayList<>();
-
-        ClassDescriptor currentDesc = descriptor;
-        while (currentDesc != null) {
-            descriptors.add(currentDesc);
-            currentDesc = currentDesc.superClassDescriptor();
-        }
-
-        Collections.reverse(descriptors);
-
-        return descriptors;
-    }
-
-    private void writeStructuredObjectLayer(Object object, ClassDescriptor layer, DataOutputStream output, MarshallingContext context)
+    private void writeStructuredObjectLayer(Object object, ClassDescriptor layer, IgniteDataOutput output, MarshallingContext context)
             throws IOException, MarshalException {
         if (layer.hasWriteObject()) {
             writeWithWriteObject(object, layer, output, context);
@@ -106,7 +88,7 @@ class StructuredObjectMarshaller implements DefaultFieldsReaderWriter {
         context.addUsedDescriptor(layer);
     }
 
-    private void writeWithWriteObject(Object object, ClassDescriptor descriptor, DataOutputStream output, MarshallingContext context)
+    private void writeWithWriteObject(Object object, ClassDescriptor descriptor, IgniteDataOutput output, MarshallingContext context)
             throws IOException, MarshalException {
         // Do not close the stream yet!
         UosObjectOutputStream oos = context.objectOutputStream(output, valueWriter, unsharedWriter, this);
@@ -127,7 +109,7 @@ class StructuredObjectMarshaller implements DefaultFieldsReaderWriter {
 
     /** {@inheritDoc} */
     @Override
-    public void defaultWriteFields(Object object, ClassDescriptor descriptor, DataOutputStream output, MarshallingContext context)
+    public void defaultWriteFields(Object object, ClassDescriptor descriptor, IgniteDataOutput output, MarshallingContext context)
             throws MarshalException, IOException {
         @Nullable BitSet nullsBitSet = writeNullsBitSet(object, descriptor, output);
 
@@ -149,7 +131,7 @@ class StructuredObjectMarshaller implements DefaultFieldsReaderWriter {
     /** {@inheritDoc} */
     @Override
     @Nullable
-    public BitSet writeNullsBitSet(Object object, ClassDescriptor descriptor, DataOutputStream output) throws IOException {
+    public BitSet writeNullsBitSet(Object object, ClassDescriptor descriptor, DataOutput output) throws IOException {
         BitSet nullsBitSet = descriptor.fieldIndexInNullsBitmapSize() == 0 ? null : new BitSet(descriptor.fieldIndexInNullsBitmapSize());
 
         for (FieldDescriptor fieldDescriptor : descriptor.fields()) {
@@ -170,7 +152,7 @@ class StructuredObjectMarshaller implements DefaultFieldsReaderWriter {
         return nullsBitSet;
     }
 
-    private void writeField(Object object, FieldDescriptor fieldDescriptor, DataOutputStream output, MarshallingContext context)
+    private void writeField(Object object, FieldDescriptor fieldDescriptor, IgniteDataOutput output, MarshallingContext context)
             throws MarshalException, IOException {
         if (fieldDescriptor.clazz().isPrimitive()) {
             writePrimitiveFieldValue(object, fieldDescriptor, output);
@@ -186,7 +168,7 @@ class StructuredObjectMarshaller implements DefaultFieldsReaderWriter {
         return fieldDescriptor.accessor().getObject(object);
     }
 
-    private void writePrimitiveFieldValue(Object object, FieldDescriptor fieldDescriptor, DataOutputStream output) throws IOException {
+    private void writePrimitiveFieldValue(Object object, FieldDescriptor fieldDescriptor, IgniteDataOutput output) throws IOException {
         FieldAccessor fieldAccessor = fieldDescriptor.accessor();
 
         switch (fieldDescriptor.typeDescriptorId()) {
@@ -227,14 +209,17 @@ class StructuredObjectMarshaller implements DefaultFieldsReaderWriter {
         }
     }
 
-    void fillStructuredObjectFrom(DataInputStream input, Object object, ClassDescriptor descriptor, UnmarshallingContext context)
+    void fillStructuredObjectFrom(IgniteDataInput input, Object object, ClassDescriptor descriptor, UnmarshallingContext context)
             throws IOException, UnmarshalException {
-        for (ClassDescriptor layer : lineage(descriptor)) {
+        List<ClassDescriptor> lineage = descriptor.lineage();
+
+        // using a for loop to avoid allocation of an iterator
+        for (ClassDescriptor layer : lineage) {
             fillStructuredObjectLayerFrom(input, layer, object, context);
         }
     }
 
-    private void fillStructuredObjectLayerFrom(DataInputStream input, ClassDescriptor layer, Object object, UnmarshallingContext context)
+    private void fillStructuredObjectLayerFrom(IgniteDataInput input, ClassDescriptor layer, Object object, UnmarshallingContext context)
             throws IOException, UnmarshalException {
         if (layer.hasReadObject()) {
             fillObjectWithReadObjectFrom(input, object, layer, context);
@@ -244,7 +229,7 @@ class StructuredObjectMarshaller implements DefaultFieldsReaderWriter {
     }
 
     private void fillObjectWithReadObjectFrom(
-            DataInputStream input,
+            IgniteDataInput input,
             Object object,
             ClassDescriptor descriptor,
             UnmarshallingContext context
@@ -267,7 +252,7 @@ class StructuredObjectMarshaller implements DefaultFieldsReaderWriter {
 
     /** {@inheritDoc} */
     @Override
-    public void defaultFillFieldsFrom(DataInputStream input, Object object, ClassDescriptor descriptor, UnmarshallingContext context)
+    public void defaultFillFieldsFrom(IgniteDataInput input, Object object, ClassDescriptor descriptor, UnmarshallingContext context)
             throws IOException, UnmarshalException {
         @Nullable BitSet nullsBitSet = readNullsBitSet(input, descriptor);
 
@@ -299,7 +284,7 @@ class StructuredObjectMarshaller implements DefaultFieldsReaderWriter {
         return ProtocolMarshalling.readFixedLengthBitSet(descriptor.fieldIndexInNullsBitmapSize(), input);
     }
 
-    private void fillFieldFrom(DataInputStream input, Object object, UnmarshallingContext context, FieldDescriptor fieldDescriptor)
+    private void fillFieldFrom(IgniteDataInput input, Object object, UnmarshallingContext context, FieldDescriptor fieldDescriptor)
             throws IOException, UnmarshalException {
         if (fieldDescriptor.clazz().isPrimitive()) {
             fillPrimitiveFieldFrom(input, object, fieldDescriptor);
@@ -313,7 +298,7 @@ class StructuredObjectMarshaller implements DefaultFieldsReaderWriter {
         fieldDescriptor.accessor().setObject(object, fieldValue);
     }
 
-    private void fillPrimitiveFieldFrom(DataInputStream input, Object object, FieldDescriptor fieldDescriptor) throws IOException {
+    private void fillPrimitiveFieldFrom(DataInput input, Object object, FieldDescriptor fieldDescriptor) throws IOException {
         FieldAccessor fieldAccessor = fieldDescriptor.accessor();
 
         switch (fieldDescriptor.typeDescriptorId()) {
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/TypedValueReader.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/TypedValueReader.java
index cc5894a..2224b3e 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/TypedValueReader.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/TypedValueReader.java
@@ -17,17 +17,17 @@
 
 package org.apache.ignite.internal.network.serialization.marshal;
 
-import java.io.DataInputStream;
 import java.io.IOException;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
 import org.jetbrains.annotations.Nullable;
 
 /**
- * Knows how to read a value from a {@link DataInputStream} and, optionally, knows the expected type (might be taken from
+ * Knows how to read a value from an input and, optionally, knows the expected type (might be taken from
  * a field or be an array component type).
  */
 interface TypedValueReader {
     /**
-     * Reads the next value from a {@link DataInputStream}.
+     * Reads the next value from an input.
      *
      * @param input     from where to read
      * @param declaredClass the original class of the object (i.e. {@code byte.class} for {@code byte}); might be {@code null}
@@ -37,6 +37,6 @@ interface TypedValueReader {
      * @throws IOException          if an I/O problem occurs
      * @throws UnmarshalException   if another problem (like {@link ClassNotFoundException}) occurs
      */
-    Object read(DataInputStream input, @Nullable Class<?> declaredClass, UnmarshallingContext context)
+    Object read(IgniteDataInput input, @Nullable Class<?> declaredClass, UnmarshallingContext context)
             throws IOException, UnmarshalException;
 }
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/TypedValueWriter.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/TypedValueWriter.java
index 4c89911..69f3a0d 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/TypedValueWriter.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/TypedValueWriter.java
@@ -17,17 +17,17 @@
 
 package org.apache.ignite.internal.network.serialization.marshal;
 
-import java.io.DataOutputStream;
 import java.io.IOException;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
 import org.jetbrains.annotations.Nullable;
 
 /**
- * Writes objects to a {@link DataOutputStream} taking their declared types into consideration (if any).
+ * Writes objects to an output taking their declared types into consideration (if any).
  * The type might come from a field or be an array component type.
  */
 interface TypedValueWriter {
     /**
-     * Writes the given object to the {@link DataOutputStream}.
+     * Writes the given object to the output.
      *
      * @param object        object to write
      * @param declaredClass the original class of the object (i.e. {@code byte.class} for {@code byte}); might be {@code null}
@@ -37,6 +37,6 @@ interface TypedValueWriter {
      * @throws IOException      if an I/O problem occurs
      * @throws MarshalException if another problem occurs
      */
-    void write(Object object, @Nullable Class<?> declaredClass, DataOutputStream output, MarshallingContext context)
+    void write(Object object, @Nullable Class<?> declaredClass, IgniteDataOutput output, MarshallingContext context)
             throws IOException, MarshalException;
 }
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/UnmarshallingContext.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/UnmarshallingContext.java
index ffa9f8e..9ac6e6f 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/UnmarshallingContext.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/UnmarshallingContext.java
@@ -17,27 +17,26 @@
 
 package org.apache.ignite.internal.network.serialization.marshal;
 
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
 import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
 import it.unimi.dsi.fastutil.ints.IntSet;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
 import java.io.IOException;
 import java.io.NotActiveException;
-import java.util.HashMap;
-import java.util.Map;
 import org.apache.ignite.internal.network.serialization.ClassDescriptor;
 import org.apache.ignite.internal.network.serialization.DescriptorRegistry;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
 import org.jetbrains.annotations.Nullable;
 
 /**
  * Context of unmarshalling act. Created once per unmarshalling a root object.
  */
 class UnmarshallingContext implements DescriptorRegistry {
-    private final ByteArrayInputStream source;
+    private final IgniteDataInput source;
     private final DescriptorRegistry descriptors;
     private final ClassLoader classLoader;
 
-    private final Map<Integer, Object> idsToObjects = new HashMap<>();
+    private final Int2ObjectMap<Object> idsToObjects = new Int2ObjectOpenHashMap<>();
     private final IntSet unsharedObjectIds = new IntOpenHashSet();
 
     private Object objectCurrentlyReadWithReadObject;
@@ -45,7 +44,7 @@ class UnmarshallingContext implements DescriptorRegistry {
 
     private UosObjectInputStream objectInputStream;
 
-    public UnmarshallingContext(ByteArrayInputStream source, DescriptorRegistry descriptors, ClassLoader classLoader) {
+    public UnmarshallingContext(IgniteDataInput source, DescriptorRegistry descriptors, ClassLoader classLoader) {
         this.source = source;
         this.descriptors = descriptors;
         this.classLoader = classLoader;
@@ -113,7 +112,7 @@ class UnmarshallingContext implements DescriptorRegistry {
         source.mark(readAheadLimit);
     }
 
-    public void resetSourceToMark() {
+    public void resetSourceToMark() throws IOException {
         source.reset();
     }
 
@@ -144,7 +143,7 @@ class UnmarshallingContext implements DescriptorRegistry {
     }
 
     UosObjectInputStream objectInputStream(
-            DataInputStream input,
+            IgniteDataInput input,
             TypedValueReader valueReader,
             TypedValueReader unsharedReader,
             DefaultFieldsReaderWriter defaultFieldsReaderWriter
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/ValueWriter.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/UosIgniteOutputStream.java
similarity index 60%
copy from modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/ValueWriter.java
copy to modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/UosIgniteOutputStream.java
index f7b2ede..dd8f8b8 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/ValueWriter.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/UosIgniteOutputStream.java
@@ -17,21 +17,27 @@
 
 package org.apache.ignite.internal.network.serialization.marshal;
 
-import java.io.DataOutputStream;
-import java.io.IOException;
+import org.apache.ignite.internal.util.io.IgniteUnsafeDataOutput;
 
 /**
- * Knows how to write a value to a {@link DataOutputStream}.
+ * {@link IgniteUnsafeDataOutput} extension that allows to track whether it's occupied or not.
  */
-interface ValueWriter<T> {
-    /**
-     * Writes the given value to a {@link DataOutputStream}.
-     *
-     * @param value     value to write
-     * @param output    where to write to
-     * @param context   marshalling context
-     * @throws IOException      if an I/O problem occurs
-     * @throws MarshalException if another problem occurs
-     */
-    void write(T value, DataOutputStream output, MarshallingContext context) throws IOException, MarshalException;
+class UosIgniteOutputStream extends IgniteUnsafeDataOutput {
+    private boolean occupied = false;
+
+    public UosIgniteOutputStream(int size) {
+        super(size);
+    }
+
+    boolean isOccupied() {
+        return occupied;
+    }
+
+    void occupy() {
+        occupied = true;
+    }
+
+    void release() {
+        occupied = false;
+    }
 }
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/UosObjectInputStream.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/UosObjectInputStream.java
index 3b89e7d..42eb272 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/UosObjectInputStream.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/UosObjectInputStream.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.network.serialization.marshal;
 
 import java.io.DataInput;
-import java.io.DataInputStream;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectStreamClass;
@@ -26,13 +25,14 @@ import java.util.BitSet;
 import org.apache.ignite.internal.network.serialization.ClassDescriptor;
 import org.apache.ignite.internal.network.serialization.FieldDescriptor;
 import org.apache.ignite.internal.network.serialization.Primitives;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
 import org.jetbrains.annotations.Nullable;
 
 /**
  * {@link ObjectInputStream} specialization used by User Object Serialization.
  */
 class UosObjectInputStream extends ObjectInputStream {
-    private final DataInputStream input;
+    private final IgniteDataInput input;
     private final TypedValueReader valueReader;
     private final TypedValueReader unsharedReader;
     private final DefaultFieldsReaderWriter defaultFieldsReaderWriter;
@@ -41,7 +41,7 @@ class UosObjectInputStream extends ObjectInputStream {
     private UosGetField currentGet;
 
     UosObjectInputStream(
-            DataInputStream input,
+            IgniteDataInput input,
             TypedValueReader valueReader,
             TypedValueReader unsharedReader,
             DefaultFieldsReaderWriter defaultFieldsReaderWriter,
@@ -73,6 +73,11 @@ class UosObjectInputStream extends ObjectInputStream {
         return input.read(buf, off, len);
     }
 
+    @Override
+    public byte[] readAllBytes() throws IOException {
+        return super.readAllBytes();
+    }
+
     /** {@inheritDoc} */
     @Override
     public byte readByte() throws IOException {
@@ -279,7 +284,7 @@ class UosObjectInputStream extends ObjectInputStream {
         /** {@inheritDoc} */
         @Override
         public boolean get(String name, boolean val) throws IOException {
-            return Bits.getBoolean(primitiveFieldsData, primitiveFieldDataOffset(name, boolean.class));
+            return LittleEndianBits.getBoolean(primitiveFieldsData, primitiveFieldDataOffset(name, boolean.class));
         }
 
         /** {@inheritDoc} */
@@ -291,37 +296,37 @@ class UosObjectInputStream extends ObjectInputStream {
         /** {@inheritDoc} */
         @Override
         public char get(String name, char val) throws IOException {
-            return Bits.getChar(primitiveFieldsData, primitiveFieldDataOffset(name, char.class));
+            return LittleEndianBits.getChar(primitiveFieldsData, primitiveFieldDataOffset(name, char.class));
         }
 
         /** {@inheritDoc} */
         @Override
         public short get(String name, short val) throws IOException {
-            return Bits.getShort(primitiveFieldsData, primitiveFieldDataOffset(name, short.class));
+            return LittleEndianBits.getShort(primitiveFieldsData, primitiveFieldDataOffset(name, short.class));
         }
 
         /** {@inheritDoc} */
         @Override
         public int get(String name, int val) throws IOException {
-            return Bits.getInt(primitiveFieldsData, primitiveFieldDataOffset(name, int.class));
+            return LittleEndianBits.getInt(primitiveFieldsData, primitiveFieldDataOffset(name, int.class));
         }
 
         /** {@inheritDoc} */
         @Override
         public long get(String name, long val) throws IOException {
-            return Bits.getLong(primitiveFieldsData, primitiveFieldDataOffset(name, long.class));
+            return LittleEndianBits.getLong(primitiveFieldsData, primitiveFieldDataOffset(name, long.class));
         }
 
         /** {@inheritDoc} */
         @Override
         public float get(String name, float val) throws IOException {
-            return Bits.getFloat(primitiveFieldsData, primitiveFieldDataOffset(name, float.class));
+            return LittleEndianBits.getFloat(primitiveFieldsData, primitiveFieldDataOffset(name, float.class));
         }
 
         /** {@inheritDoc} */
         @Override
         public double get(String name, double val) throws IOException {
-            return Bits.getDouble(primitiveFieldsData, primitiveFieldDataOffset(name, double.class));
+            return LittleEndianBits.getDouble(primitiveFieldsData, primitiveFieldDataOffset(name, double.class));
         }
 
         /** {@inheritDoc} */
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/UosObjectOutputStream.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/UosObjectOutputStream.java
index 79dde97..d4ae13d 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/UosObjectOutputStream.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/UosObjectOutputStream.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.network.serialization.marshal;
 
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.NotActiveException;
 import java.io.ObjectOutput;
@@ -26,13 +25,14 @@ import java.util.BitSet;
 import org.apache.ignite.internal.network.serialization.ClassDescriptor;
 import org.apache.ignite.internal.network.serialization.FieldDescriptor;
 import org.apache.ignite.internal.network.serialization.Primitives;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
 import org.jetbrains.annotations.Nullable;
 
 /**
  * {@link ObjectOutputStream} specialization used by User Object Serialization.
  */
 class UosObjectOutputStream extends ObjectOutputStream {
-    private final DataOutputStream output;
+    private final IgniteDataOutput output;
     private final TypedValueWriter valueWriter;
     private final TypedValueWriter unsharedWriter;
     private final DefaultFieldsReaderWriter defaultFieldsReaderWriter;
@@ -41,7 +41,7 @@ class UosObjectOutputStream extends ObjectOutputStream {
     private UosPutField currentPut;
 
     UosObjectOutputStream(
-            DataOutputStream output,
+            IgniteDataOutput output,
             TypedValueWriter valueWriter,
             TypedValueWriter unsharedWriter, DefaultFieldsReaderWriter defaultFieldsReaderWriter,
             MarshallingContext context
@@ -253,7 +253,7 @@ class UosObjectOutputStream extends ObjectOutputStream {
         /** {@inheritDoc} */
         @Override
         public void put(String name, boolean val) {
-            Bits.putBoolean(primitiveFieldsData, primitiveFieldDataOffset(name, boolean.class), val);
+            LittleEndianBits.putBoolean(primitiveFieldsData, primitiveFieldDataOffset(name, boolean.class), val);
         }
 
         /** {@inheritDoc} */
@@ -265,37 +265,37 @@ class UosObjectOutputStream extends ObjectOutputStream {
         /** {@inheritDoc} */
         @Override
         public void put(String name, char val) {
-            Bits.putChar(primitiveFieldsData, primitiveFieldDataOffset(name, char.class), val);
+            LittleEndianBits.putChar(primitiveFieldsData, primitiveFieldDataOffset(name, char.class), val);
         }
 
         /** {@inheritDoc} */
         @Override
         public void put(String name, short val) {
-            Bits.putShort(primitiveFieldsData, primitiveFieldDataOffset(name, short.class), val);
+            LittleEndianBits.putShort(primitiveFieldsData, primitiveFieldDataOffset(name, short.class), val);
         }
 
         /** {@inheritDoc} */
         @Override
         public void put(String name, int val) {
-            Bits.putInt(primitiveFieldsData, primitiveFieldDataOffset(name, int.class), val);
+            LittleEndianBits.putInt(primitiveFieldsData, primitiveFieldDataOffset(name, int.class), val);
         }
 
         /** {@inheritDoc} */
         @Override
         public void put(String name, long val) {
-            Bits.putLong(primitiveFieldsData, primitiveFieldDataOffset(name, long.class), val);
+            LittleEndianBits.putLong(primitiveFieldsData, primitiveFieldDataOffset(name, long.class), val);
         }
 
         /** {@inheritDoc} */
         @Override
         public void put(String name, float val) {
-            Bits.putFloat(primitiveFieldsData, primitiveFieldDataOffset(name, float.class), val);
+            LittleEndianBits.putFloat(primitiveFieldsData, primitiveFieldDataOffset(name, float.class), val);
         }
 
         /** {@inheritDoc} */
         @Override
         public void put(String name, double val) {
-            Bits.putDouble(primitiveFieldsData, primitiveFieldDataOffset(name, double.class), val);
+            LittleEndianBits.putDouble(primitiveFieldsData, primitiveFieldDataOffset(name, double.class), val);
         }
 
         /** {@inheritDoc} */
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/ValueReader.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/ValueReader.java
index 1ab25a1..240a575 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/ValueReader.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/ValueReader.java
@@ -17,15 +17,15 @@
 
 package org.apache.ignite.internal.network.serialization.marshal;
 
-import java.io.DataInputStream;
 import java.io.IOException;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
 
 /**
- * Knows how to read a value from a {@link DataInputStream}.
+ * Knows how to read a value from an input.
  */
 interface ValueReader<T> {
     /**
-     * Reads the next value from a {@link DataInputStream}.
+     * Reads the next value from an input.
      *
      * @param input     from where to read
      * @param context   unmarshalling context
@@ -33,5 +33,5 @@ interface ValueReader<T> {
      * @throws IOException          if an I/O problem occurs
      * @throws UnmarshalException   if another problem (like {@link ClassNotFoundException}) occurs
      */
-    T read(DataInputStream input, UnmarshallingContext context) throws IOException, UnmarshalException;
+    T read(IgniteDataInput input, UnmarshallingContext context) throws IOException, UnmarshalException;
 }
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/ValueWriter.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/ValueWriter.java
index f7b2ede..a4ce6ee 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/ValueWriter.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/ValueWriter.java
@@ -17,15 +17,15 @@
 
 package org.apache.ignite.internal.network.serialization.marshal;
 
-import java.io.DataOutputStream;
 import java.io.IOException;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
 
 /**
- * Knows how to write a value to a {@link DataOutputStream}.
+ * Knows how to write a value to an output.
  */
 interface ValueWriter<T> {
     /**
-     * Writes the given value to a {@link DataOutputStream}.
+     * Writes the given value to an output.
      *
      * @param value     value to write
      * @param output    where to write to
@@ -33,5 +33,5 @@ interface ValueWriter<T> {
      * @throws IOException      if an I/O problem occurs
      * @throws MarshalException if another problem occurs
      */
-    void write(T value, DataOutputStream output, MarshallingContext context) throws IOException, MarshalException;
+    void write(T value, IgniteDataOutput output, MarshallingContext context) throws IOException, MarshalException;
 }
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/AllTypesMessage.java b/modules/network/src/test/java/org/apache/ignite/internal/network/AllTypesMessage.java
index a8d0504..4a190e0 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/AllTypesMessage.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/AllTypesMessage.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.network;
 
+import java.io.Serializable;
 import java.util.BitSet;
 import java.util.Collection;
 import java.util.Map;
@@ -30,7 +31,7 @@ import org.apache.ignite.network.annotations.Transferable;
  * {@link NetworkMessage} implementation.
  */
 @Transferable(TestMessageTypes.ALL_TYPES)
-public interface AllTypesMessage extends NetworkMessage {
+public interface AllTypesMessage extends NetworkMessage, Serializable {
     byte byteA();
 
     short shortB();
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/AllTypesMessageGenerator.java b/modules/network/src/test/java/org/apache/ignite/internal/network/AllTypesMessageGenerator.java
index 883a051..43692e7 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/AllTypesMessageGenerator.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/AllTypesMessageGenerator.java
@@ -17,11 +17,13 @@
 
 package org.apache.ignite.internal.network;
 
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+
 import java.lang.reflect.Field;
 import java.util.BitSet;
 import java.util.Random;
 import java.util.UUID;
-import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.lang.IgniteUuidGenerator;
@@ -33,6 +35,10 @@ import org.jetbrains.annotations.Nullable;
  * Generator for an {@link AllTypesMessage}.
  */
 public class AllTypesMessageGenerator {
+    public static AllTypesMessage generate(long seed, boolean nestedMsg) {
+        return generate(seed, nestedMsg, true);
+    }
+
     /**
      * Generate a new {@link AllTypesMessage}.
      *
@@ -40,7 +46,7 @@ public class AllTypesMessageGenerator {
      * @param nestedMsg {@code true} if nested messages should be generated, {@code false} otherwise.
      * @return Message.
      */
-    public static AllTypesMessage generate(long seed, boolean nestedMsg) {
+    public static AllTypesMessage generate(long seed, boolean nestedMsg, boolean fillArrays) {
         try {
             var random = new Random(seed);
 
@@ -51,21 +57,23 @@ public class AllTypesMessageGenerator {
             for (Field field : fields) {
                 field.setAccessible(true);
 
-                field.set(message, randomValue(random, field, nestedMsg));
+                if (!field.getType().isArray() || fillArrays) {
+                    field.set(message, randomValue(random, field, nestedMsg));
+                }
             }
 
             if (nestedMsg) {
                 message.netMsgArrV(
-                        IntStream.range(0, 10).mapToObj(unused -> generate(seed, false)).toArray(NetworkMessage[]::new)
+                        IntStream.range(0, 10).mapToObj(unused -> generate(seed, false, fillArrays)).toArray(NetworkMessage[]::new)
                 );
 
                 message.netMsgCollW(IntStream.range(0, 10)
-                        .mapToObj(unused -> generate(seed, false))
-                        .collect(Collectors.toList()));
+                        .mapToObj(unused -> generate(seed, false, fillArrays))
+                        .collect(toList()));
 
                 message.newMsgMapX(IntStream.range(0, 10)
                         .boxed()
-                        .collect(Collectors.toMap(String::valueOf, unused -> generate(seed, false))));
+                        .collect(toMap(String::valueOf, unused -> generate(seed, false, fillArrays))));
             }
 
             return message.build();
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/SerializationMicroBenchmark.java b/modules/network/src/test/java/org/apache/ignite/internal/network/SerializationMicroBenchmark.java
new file mode 100644
index 0000000..c4df17a
--- /dev/null
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/SerializationMicroBenchmark.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.network.serialization.ClassDescriptorFactory;
+import org.apache.ignite.internal.network.serialization.ClassDescriptorRegistry;
+import org.apache.ignite.internal.network.serialization.marshal.DefaultUserObjectMarshaller;
+import org.apache.ignite.internal.network.serialization.marshal.MarshalException;
+import org.apache.ignite.internal.network.serialization.marshal.MarshalledObject;
+import org.apache.ignite.internal.network.serialization.marshal.UnmarshalException;
+import org.apache.ignite.internal.network.serialization.marshal.UserObjectMarshaller;
+import org.jetbrains.annotations.Nullable;
+import org.objenesis.strategy.StdInstantiatorStrategy;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+/**
+ * A micro-benchmark of {@link DefaultUserObjectMarshaller}.
+ */
+@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
+@Fork(3)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.NANOSECONDS)
+@State(Scope.Thread)
+public class SerializationMicroBenchmark {
+
+    static TestClass smallObject;
+    static AllTypesMessage mediumObject;
+    static AllTypesMessage largeObject;
+
+    static ClassDescriptorRegistry registry;
+
+    static UserObjectMarshaller userObjectMarshaller;
+
+    static Kryo kryo;
+
+    static byte[] smallSerializedWithUos;
+    static byte[] smallSerializedWithJava;
+    static byte[] smallSerializedWithKryo;
+
+    static byte[] mediumSerializedWithUos;
+    static byte[] mediumSerializedWithJava;
+    static byte[] mediumSerializedWithKryo;
+
+    static byte[] largeSerializedWithUos;
+    static byte[] largeSerializedWithJava;
+    static byte[] largeSerializedWithKryo;
+
+    static {
+        registry = new ClassDescriptorRegistry();
+        var factory = new ClassDescriptorFactory(registry);
+        userObjectMarshaller = new DefaultUserObjectMarshaller(registry, factory);
+
+        smallObject = new TestClass(1000, false, 3000);
+        mediumObject = AllTypesMessageGenerator.generate(10, true, false);
+        largeObject = AllTypesMessageGenerator.generate(10, true, true);
+
+        factory.create(smallObject.getClass());
+        factory.create(largeObject.getClass());
+
+        kryo = new Kryo();
+        kryo.setInstantiatorStrategy(new StdInstantiatorStrategy());
+
+        try {
+            smallSerializedWithUos = userObjectMarshaller.marshal(smallObject).bytes();
+            mediumSerializedWithUos = userObjectMarshaller.marshal(mediumObject).bytes();
+            largeSerializedWithUos = userObjectMarshaller.marshal(largeObject).bytes();
+        } catch (MarshalException e) {
+            throw new RuntimeException(e);
+        }
+
+        smallSerializedWithJava = serializeWithJdk(smallObject);
+        mediumSerializedWithJava = serializeWithJdk(mediumObject);
+        largeSerializedWithJava = serializeWithJdk(largeObject);
+
+        smallSerializedWithKryo = serializeWithKryo(smallObject);
+        mediumSerializedWithKryo = serializeWithKryo(mediumObject);
+        largeSerializedWithKryo = serializeWithKryo(largeObject);
+
+        System.out.println("Small Java: " + smallSerializedWithJava.length);
+        System.out.println("Small UOS : " + smallSerializedWithUos.length);
+        System.out.println("Small Kryo: " + smallSerializedWithKryo.length);
+
+        System.out.println("Medium Java: " + mediumSerializedWithJava.length);
+        System.out.println("Medium UOS : " + mediumSerializedWithUos.length);
+        System.out.println("Medium Kryo: " + mediumSerializedWithKryo.length);
+
+        System.out.println("Large Java: " + largeSerializedWithJava.length);
+        System.out.println("Large UOS : " + largeSerializedWithUos.length);
+        System.out.println("Large Kryo: " + largeSerializedWithKryo.length);
+    }
+
+    /**
+     * Runs the benchmark.
+     *
+     * @param args args
+     * @throws Exception if something goes wrong
+     */
+    public static void main(String[] args) throws Exception {
+        Options build = new OptionsBuilder()
+                //.addProfiler("gc")
+                .include(SerializationMicroBenchmark.class.getName() + ".*").build();
+
+        new Runner(build).run();
+    }
+
+    @Benchmark
+    public byte[] serialization_01_small_jdk() {
+        return serializeWithJdk(smallObject);
+    }
+
+    @Benchmark
+    public byte[] serialization_11_medium_jdk() {
+        return serializeWithJdk(mediumObject);
+    }
+
+    @Benchmark
+    public byte[] serialization_21_large_jdk() {
+        return serializeWithJdk(largeObject);
+    }
+
+    @Benchmark
+    public void serialization_02_small_uos(Blackhole blackhole) {
+        benchmarkUosSerialization(smallObject, blackhole);
+    }
+
+    @Benchmark
+    public void serialization_12_medium_uos(Blackhole blackhole) {
+        benchmarkUosSerialization(mediumObject, blackhole);
+    }
+
+    @Benchmark
+    public void serialization_22_large_uos(Blackhole blackhole) {
+        benchmarkUosSerialization(largeObject, blackhole);
+    }
+
+    private void benchmarkUosSerialization(Object obj, Blackhole blackhole) {
+        try {
+            MarshalledObject marshal = userObjectMarshaller.marshal(obj);
+
+            blackhole.consume(marshal);
+            blackhole.consume(marshal.bytes());
+        } catch (MarshalException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Benchmark
+    public byte[] serialization_03_small_kryo() {
+        return serializeWithKryo(smallObject);
+    }
+
+    @Benchmark
+    public byte[] serialization_13_medium_kryo() {
+        return serializeWithKryo(mediumObject);
+    }
+
+    @Benchmark
+    public byte[] serialization_23_large_kryo() {
+        return serializeWithKryo(largeObject);
+    }
+
+    @Benchmark
+    public Object deserialization_01_small_jdk() {
+        return deserializeWithJava(SerializationMicroBenchmark.smallSerializedWithJava);
+    }
+
+    @Benchmark
+    public Object deserialization_11_medium_jdk() {
+        return deserializeWithJava(SerializationMicroBenchmark.mediumSerializedWithJava);
+    }
+
+    @Benchmark
+    public Object deserialization_21_large_jdk() {
+        return deserializeWithJava(SerializationMicroBenchmark.largeSerializedWithJava);
+    }
+
+    private Object deserializeWithJava(byte[] serialized) {
+        try (var bis = new ByteArrayInputStream(serialized); var ois = new ObjectInputStream(bis)) {
+            return ois.readObject();
+        } catch (IOException | ClassNotFoundException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Benchmark
+    public Object deserialization_02_small_uos() {
+        return deserializeWithUos(smallSerializedWithUos);
+    }
+
+    @Benchmark
+    public Object deserialization_12_medium_uos() {
+        return deserializeWithUos(mediumSerializedWithUos);
+    }
+
+    @Benchmark
+    public Object deserialization_22_large_uos() {
+        return deserializeWithUos(largeSerializedWithUos);
+    }
+
+    @Nullable
+    private Object deserializeWithUos(byte[] serialized) {
+        try {
+            return userObjectMarshaller.unmarshal(serialized, registry);
+        } catch (UnmarshalException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Benchmark
+    public Object deserialization_03_small_kryo() {
+        return deserializeWithKryo(smallSerializedWithKryo);
+    }
+
+    @Benchmark
+    public Object deserialization_13_medium_kryo() {
+        return deserializeWithKryo(mediumSerializedWithKryo);
+    }
+
+    @Benchmark
+    public Object deserialization_23_large_kryo() {
+        return deserializeWithKryo(largeSerializedWithKryo);
+    }
+
+    private Object deserializeWithKryo(byte[] serialized) {
+        ByteArrayInputStream stream = new ByteArrayInputStream(serialized);
+        try (Input input = new Input(stream)) {
+            return kryo.readClassAndObject(input);
+        }
+    }
+
+    private static byte[] serializeWithJdk(Object obj) {
+        try (var bos = new ByteArrayOutputStream(); var oos = new ObjectOutputStream(bos)) {
+            oos.writeObject(obj);
+
+            return bos.toByteArray();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static byte[] serializeWithKryo(Object obj) {
+        ByteArrayOutputStream stream = new ByteArrayOutputStream();
+        try (Output output = new Output(stream)) {
+            kryo.writeClassAndObject(output, obj);
+            output.close();
+
+            return stream.toByteArray();
+        }
+    }
+
+    static class TestClass implements Serializable {
+
+        private int foo;
+
+        private boolean bar;
+
+        private double foobar;
+
+        public TestClass() {
+        }
+
+        public TestClass(int foo, boolean bar, double foobar) {
+            this.foo = foo;
+            this.bar = bar;
+            this.foobar = foobar;
+        }
+    }
+
+}
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/UosProfilerTarget.java b/modules/network/src/test/java/org/apache/ignite/internal/network/UosProfilerTarget.java
new file mode 100644
index 0000000..ce60d6d
--- /dev/null
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/UosProfilerTarget.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network;
+
+import java.util.Arrays;
+import java.util.Objects;
+import org.apache.ignite.internal.network.SerializationMicroBenchmark.TestClass;
+import org.apache.ignite.internal.network.serialization.ClassDescriptorFactory;
+import org.apache.ignite.internal.network.serialization.ClassDescriptorRegistry;
+import org.apache.ignite.internal.network.serialization.marshal.DefaultUserObjectMarshaller;
+import org.apache.ignite.internal.network.serialization.marshal.MarshalException;
+import org.apache.ignite.internal.network.serialization.marshal.MarshalledObject;
+import org.apache.ignite.internal.network.serialization.marshal.UserObjectMarshaller;
+
+/**
+ * For running with a profiler.
+ */
+public class UosProfilerTarget {
+
+    static Object obj;
+
+    static final ClassDescriptorRegistry registry = new ClassDescriptorRegistry();
+    static UserObjectMarshaller userObjectMarshaller;
+
+    static Class<?> clz;
+
+    static byte[] marshalledByUos;
+
+    static {
+        var factory = new ClassDescriptorFactory(registry);
+        userObjectMarshaller = new DefaultUserObjectMarshaller(registry, factory);
+        // obj = AllTypesMessageGenerator.generate(10, true, true);
+        obj = AllTypesMessageGenerator.generate(10, true, false);
+        // obj = new TestClass(1000, false, 3000);
+        clz = obj.getClass();
+        factory.create(clz);
+        try {
+            MarshalledObject marshal = userObjectMarshaller.marshal(obj);
+            marshalledByUos = marshal.bytes();
+        } catch (MarshalException e) {
+            e.printStackTrace();
+        }
+    }
+
+    public static void main(String[] args) throws Exception {
+        //serialize();
+        deserialize();
+    }
+
+    private static void serialize() throws Exception {
+        int accumulatedCode = 1;
+        int count = (obj instanceof TestClass) ? 30_000_000 : 100_000;
+        for (int i = 0; i < count; i++) {
+            byte[] bytes = doMarshal(UosProfilerTarget.obj);
+            int code = Arrays.hashCode(bytes);
+            accumulatedCode ^= code;
+        }
+
+        System.out.println(accumulatedCode);
+    }
+
+    private static byte[] doMarshal(Object obj1) throws Exception {
+        MarshalledObject marshal = userObjectMarshaller.marshal(obj1);
+        return marshal.bytes();
+    }
+
+    private static void deserialize() throws Exception {
+        int accumulatedCode = 1;
+        int count = (obj instanceof TestClass) ? 30_000_000 : 100_000;
+        for (int i = 0; i < count; i++) {
+            Object object = doUnmarshal(marshalledByUos);
+            int code = Objects.hashCode(object);
+            accumulatedCode ^= code;
+        }
+
+        System.out.println(accumulatedCode);
+    }
+
+    private static Object doUnmarshal(byte[] bytes) throws Exception {
+        return userObjectMarshaller.unmarshal(bytes, registry);
+    }
+}
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/BuiltInDescriptorsTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/BuiltInDescriptorsTest.java
index 9d5bd97..1ca38db 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/BuiltInDescriptorsTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/BuiltInDescriptorsTest.java
@@ -58,6 +58,7 @@ import static org.apache.ignite.internal.network.serialization.BuiltInType.SHORT
 import static org.apache.ignite.internal.network.serialization.BuiltInType.SHORT_BOXED;
 import static org.apache.ignite.internal.network.serialization.BuiltInType.SINGLETON_LIST;
 import static org.apache.ignite.internal.network.serialization.BuiltInType.STRING;
+import static org.apache.ignite.internal.network.serialization.BuiltInType.STRING_LATIN1;
 import static org.apache.ignite.internal.network.serialization.BuiltInType.UUID;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
@@ -114,5 +115,6 @@ public class BuiltInDescriptorsTest {
         assertEquals(39, REFERENCE.descriptorId());
         assertEquals(40, CLASS.descriptorId());
         assertEquals(41, PROXY.descriptorId());
+        assertEquals(42, STRING_LATIN1.descriptorId());
     }
 }
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/ClassesTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/ClassesTest.java
index bdeb7f1..fc9878a 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/ClassesTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/ClassesTest.java
@@ -202,6 +202,16 @@ class ClassesTest {
         assertFalse(Classes.isRuntimeTypeKnownUpfront(Enum[].class));
     }
 
+    @Test
+    void isValueTypeKnownUpfrontReturnsFalseForString() {
+        assertFalse(Classes.isRuntimeTypeKnownUpfront(String.class));
+    }
+
+    @Test
+    void isValueTypeKnownUpfrontReturnsFalseForArrayOfString() {
+        assertFalse(Classes.isRuntimeTypeKnownUpfront(String[].class));
+    }
+
     private static class EmptySerializable implements Serializable {
     }
 
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/MarshallableTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/MarshallableTest.java
index b3e403c..30c64ab 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/MarshallableTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/MarshallableTest.java
@@ -30,13 +30,13 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.UnpooledByteBufAllocator;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.embedded.EmbeddedChannel;
+import it.unimi.dsi.fastutil.ints.IntSets;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Map;
 import org.apache.ignite.internal.network.direct.DirectMessageWriter;
 import org.apache.ignite.internal.network.netty.ConnectionManager;
@@ -192,7 +192,7 @@ public class MarshallableTest {
             try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); ObjectOutputStream oos = new ObjectOutputStream(baos)) {
                 oos.writeObject(object);
                 oos.close();
-                return new MarshalledObject(baos.toByteArray(), Collections.singleton(descriptor));
+                return new MarshalledObject(baos.toByteArray(), IntSets.singleton(descriptor.descriptorId()));
             } catch (Exception e) {
                 throw new RuntimeException(e);
             }
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/marshal/DefaultUserObjectMarshallerCommonTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/marshal/DefaultUserObjectMarshallerCommonTest.java
index 3e2e036..a2254d7 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/marshal/DefaultUserObjectMarshallerCommonTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/marshal/DefaultUserObjectMarshallerCommonTest.java
@@ -18,12 +18,20 @@
 package org.apache.ignite.internal.network.serialization.marshal;
 
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
 import java.util.Arrays;
+import java.util.List;
 import org.apache.ignite.internal.network.serialization.ClassDescriptorFactory;
 import org.apache.ignite.internal.network.serialization.ClassDescriptorRegistry;
+import org.apache.ignite.internal.network.serialization.DescriptorRegistry;
 import org.junit.jupiter.api.Test;
 
 /**
@@ -35,6 +43,9 @@ class DefaultUserObjectMarshallerCommonTest {
 
     private final DefaultUserObjectMarshaller marshaller = new DefaultUserObjectMarshaller(descriptorRegistry, descriptorFactory);
 
+    static DefaultUserObjectMarshaller staticMarshaller;
+    static DescriptorRegistry staticRegistry;
+
     @Test
     void throwsOnExcessiveInputWhenUnmarshalling() throws Exception {
         MarshalledObject marshalled = marshaller.marshal(null);
@@ -45,4 +56,67 @@ class DefaultUserObjectMarshallerCommonTest {
         assertThat(ex.getMessage(), is("After reading a value, 1 excessive byte(s) still remain"));
     }
 
+    @Test
+    void previousInvocationsOfMarshallingInSameThreadDoNotHaveVisibleEffectsOnFollowingInvocations() throws Exception {
+        List<Integer> list = List.of(1, 2, 3);
+
+        MarshalledObject firstResult = marshaller.marshal(list);
+
+        MarshalledObject secondResult = marshaller.marshal(list);
+
+        assertThat(marshaller.unmarshal(secondResult.bytes(), descriptorRegistry), is(equalTo(list)));
+        assertThat(secondResult.bytes(), is(equalTo(firstResult.bytes())));
+    }
+
+    @Test
+    void nestedMarshallingUnmarshallingInsideWriteReadObjectDoNotInterfereWithOutsideMarshallingUnmarshalling() throws Exception {
+        staticMarshaller = marshaller;
+        staticRegistry = descriptorRegistry;
+
+        WithNestedMarshalling unmarshalled = marshalAndUnmarshalNotNull(new WithNestedMarshalling(42));
+
+        assertThat(unmarshalled.value, is(42));
+    }
+
+    private <T> T marshalAndUnmarshalNotNull(Object object) throws MarshalException, UnmarshalException {
+        MarshalledObject marshalled = marshaller.marshal(object);
+        T unmarshalled = marshaller.unmarshal(marshalled.bytes(), descriptorRegistry);
+
+        assertThat(unmarshalled, is(notNullValue()));
+
+        return unmarshalled;
+    }
+
+    private static class WithNestedMarshalling implements Serializable {
+        private final int value;
+
+        private WithNestedMarshalling(int value) {
+            this.value = value;
+        }
+
+        private void writeObject(ObjectOutputStream stream) throws IOException {
+            marshalSomethingElse();
+
+            stream.defaultWriteObject();
+        }
+
+        private byte[] marshalSomethingElse() {
+            try {
+                return staticMarshaller.marshal(List.of(1, 2, 3)).bytes();
+            } catch (MarshalException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        private void readObject(ObjectInputStream stream) throws IOException, ClassNotFoundException {
+            byte[] marshalledBytes = marshalSomethingElse();
+            try {
+                staticMarshaller.unmarshal(marshalledBytes, staticRegistry);
+            } catch (UnmarshalException e) {
+                throw new RuntimeException(e);
+            }
+
+            stream.defaultReadObject();;
+        }
+    }
 }
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/marshal/DefaultUserObjectMarshallerConcreteTypesKnownUpfrontOptimizationTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/marshal/DefaultUserObjectMarshallerConcreteTypesKnownUpfrontOptimizationTest.java
index 6bd22e9..94ca19d 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/marshal/DefaultUserObjectMarshallerConcreteTypesKnownUpfrontOptimizationTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/marshal/DefaultUserObjectMarshallerConcreteTypesKnownUpfrontOptimizationTest.java
@@ -23,8 +23,7 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.notNullValue;
 
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
+import java.io.DataInput;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
@@ -32,7 +31,8 @@ import java.io.Serializable;
 import org.apache.ignite.internal.network.serialization.BuiltInType;
 import org.apache.ignite.internal.network.serialization.ClassDescriptorFactory;
 import org.apache.ignite.internal.network.serialization.ClassDescriptorRegistry;
-import org.jetbrains.annotations.NotNull;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteUnsafeDataInput;
 import org.junit.jupiter.api.Test;
 
 class DefaultUserObjectMarshallerConcreteTypesKnownUpfrontOptimizationTest {
@@ -48,23 +48,22 @@ class DefaultUserObjectMarshallerConcreteTypesKnownUpfrontOptimizationTest {
     void writesOnlyThePrimitiveValueForPrimitiveFields() throws Exception {
         MarshalledObject marshalled = marshaller.marshal(new WithPrimitiveField((byte) 123));
 
-        DataInputStream dis = openDataStreamAndSkipRootObjectHeader(marshalled);
+        IgniteDataInput dis = openDataStreamAndSkipRootObjectHeader(marshalled);
 
         assertThat(dis.readAllBytes(), is(new byte[]{123}));
     }
 
-    private DataInputStream openDataStreamAndSkipRootObjectHeader(MarshalledObject marshalled) throws IOException {
-        DataInputStream dis = openDataStream(marshalled);
+    private IgniteDataInput openDataStreamAndSkipRootObjectHeader(MarshalledObject marshalled) throws IOException {
+        IgniteDataInput dis = openDataStream(marshalled);
         skipTillRootObjectData(dis);
         return dis;
     }
 
-    @NotNull
-    private DataInputStream openDataStream(MarshalledObject marshalled) {
-        return new DataInputStream(new ByteArrayInputStream(marshalled.bytes()));
+    private IgniteDataInput openDataStream(MarshalledObject marshalled) {
+        return new IgniteUnsafeDataInput(marshalled.bytes());
     }
 
-    private void skipTillRootObjectData(DataInputStream dos) throws IOException {
+    private void skipTillRootObjectData(DataInput dos) throws IOException {
         ProtocolMarshalling.readDescriptorOrCommandId(dos);
         ProtocolMarshalling.readObjectId(dos);
     }
@@ -73,7 +72,7 @@ class DefaultUserObjectMarshallerConcreteTypesKnownUpfrontOptimizationTest {
     void treatsPrimitiveArrayFieldsAsFieldsWithTypeKnownUpfront() throws Exception {
         MarshalledObject marshalled = marshaller.marshal(new WithPrimitiveArrayField(new byte[]{123}));
 
-        DataInputStream dis = openDataStreamAndSkipRootObjectHeader(marshalled);
+        IgniteDataInput dis = openDataStreamAndSkipRootObjectHeader(marshalled);
 
         skipOneByteEmptyNullBitMask(dis);
         assertThat(ProtocolMarshalling.readObjectId(dis), is(SECOND_USER_OBJECT_ID));
@@ -85,7 +84,7 @@ class DefaultUserObjectMarshallerConcreteTypesKnownUpfrontOptimizationTest {
     void doesNotWriteNullsBitmapForPrimitiveArrays() throws Exception {
         MarshalledObject marshalled = marshaller.marshal(new byte[]{123});
 
-        DataInputStream dis = openDataStreamAndSkipRootObjectHeader(marshalled);
+        IgniteDataInput dis = openDataStreamAndSkipRootObjectHeader(marshalled);
 
         assertThat(ProtocolMarshalling.readLength(dis), is(1));
         assertThat(dis.readAllBytes(), is(new byte[]{123}));
@@ -95,7 +94,7 @@ class DefaultUserObjectMarshallerConcreteTypesKnownUpfrontOptimizationTest {
     void writesNullsBitmapAndNoDescriptorIdForNonNullPrimitiveWrapperFields() throws Exception {
         MarshalledObject marshalled = marshaller.marshal(new WithPrimitiveWrapperField((byte) 123));
 
-        DataInputStream dis = openDataStreamAndSkipRootObjectHeader(marshalled);
+        IgniteDataInput dis = openDataStreamAndSkipRootObjectHeader(marshalled);
 
         skipOneByteEmptyNullBitMask(dis);
         assertThat(ProtocolMarshalling.readObjectId(dis), is(SECOND_USER_OBJECT_ID));
@@ -103,7 +102,7 @@ class DefaultUserObjectMarshallerConcreteTypesKnownUpfrontOptimizationTest {
         assertThat(dis.readAllBytes(), is(new byte[]{123}));
     }
 
-    private void skipOneByteEmptyNullBitMask(DataInputStream dis) throws IOException {
+    private void skipOneByteEmptyNullBitMask(DataInput dis) throws IOException {
         assertThat(dis.readByte(), is((byte) 0));
     }
 
@@ -111,7 +110,7 @@ class DefaultUserObjectMarshallerConcreteTypesKnownUpfrontOptimizationTest {
     void writesOnlyNullsBitmapForNullPrimitiveWrapperFields() throws Exception {
         MarshalledObject marshalled = marshaller.marshal(new WithPrimitiveWrapperField(null));
 
-        DataInputStream dis = openDataStreamAndSkipRootObjectHeader(marshalled);
+        IgniteDataInput dis = openDataStreamAndSkipRootObjectHeader(marshalled);
 
         final byte nullsBitmapWithOneBitSet = 1;
         assertThat(dis.readAllBytes(), is(new byte[]{nullsBitmapWithOneBitSet}));
@@ -121,7 +120,7 @@ class DefaultUserObjectMarshallerConcreteTypesKnownUpfrontOptimizationTest {
     void writesNullsBitmapAndNoDescriptorIdForNonNullValueOfFieldWhichTypeIsKnownUpfront() throws Exception {
         MarshalledObject marshalled = marshaller.marshal(new WithFieldOfTypeKnownUpfront(new FinalClass((byte) 123)));
 
-        DataInputStream dis = openDataStreamAndSkipRootObjectHeader(marshalled);
+        IgniteDataInput dis = openDataStreamAndSkipRootObjectHeader(marshalled);
 
         skipOneByteEmptyNullBitMask(dis);
         assertThat(ProtocolMarshalling.readObjectId(dis), is(SECOND_USER_OBJECT_ID));
@@ -133,7 +132,7 @@ class DefaultUserObjectMarshallerConcreteTypesKnownUpfrontOptimizationTest {
     void writesOnlyNullsBitmapForNullValueOfFieldWhichTypeIsKnownUpfront() throws Exception {
         MarshalledObject marshalled = marshaller.marshal(new WithFieldOfTypeKnownUpfront(null));
 
-        DataInputStream dis = openDataStreamAndSkipRootObjectHeader(marshalled);
+        IgniteDataInput dis = openDataStreamAndSkipRootObjectHeader(marshalled);
 
         final byte nullsBitmapWithOneBitSet = (byte) 1;
         assertThat(dis.readAllBytes(), is(new byte[]{nullsBitmapWithOneBitSet}));
@@ -143,7 +142,7 @@ class DefaultUserObjectMarshallerConcreteTypesKnownUpfrontOptimizationTest {
     void writesDescriptorIdAndObjectIdAndTheValueForNonNullValueOfFieldWhichTypeIsNotKnownUpfront() throws Exception {
         MarshalledObject marshalled = marshaller.marshal(new WithFieldOfTypeNotKnownUpfront(new NonFinalClass((byte) 123)));
 
-        DataInputStream dis = openDataStreamAndSkipRootObjectHeader(marshalled);
+        IgniteDataInput dis = openDataStreamAndSkipRootObjectHeader(marshalled);
 
         assertThat(ProtocolMarshalling.readDescriptorOrCommandId(dis), greaterThanOrEqualTo(MIN_CUSTOM_DESCRIPTOR_ID));
         assertThat(ProtocolMarshalling.readObjectId(dis), is(SECOND_USER_OBJECT_ID));
@@ -155,7 +154,7 @@ class DefaultUserObjectMarshallerConcreteTypesKnownUpfrontOptimizationTest {
     void writesOnlyNullMarkerForNullValueOfFieldWhichTypeIsNotKnownUpfront() throws Exception {
         MarshalledObject marshalled = marshaller.marshal(new WithFieldOfTypeNotKnownUpfront(null));
 
-        DataInputStream dis = openDataStreamAndSkipRootObjectHeader(marshalled);
+        IgniteDataInput dis = openDataStreamAndSkipRootObjectHeader(marshalled);
 
         assertThat(dis.readAllBytes(), is(new byte[]{(byte) BuiltInType.NULL.descriptorId()}));
     }
@@ -166,7 +165,7 @@ class DefaultUserObjectMarshallerConcreteTypesKnownUpfrontOptimizationTest {
                 new WithArrayFieldOfTypeKnownUpfront(new FinalClass[]{new FinalClass((byte) 123)})
         );
 
-        DataInputStream dis = openDataStreamAndSkipRootObjectHeader(marshalled);
+        IgniteDataInput dis = openDataStreamAndSkipRootObjectHeader(marshalled);
 
         // beginning of array of FinalClass
         skipOneByteEmptyNullBitMask(dis);
@@ -188,7 +187,7 @@ class DefaultUserObjectMarshallerConcreteTypesKnownUpfrontOptimizationTest {
                 new WithArrayFieldOfTypeNotKnownUpfront(new NonFinalClass[]{new NonFinalClass((byte) 123)})
         );
 
-        DataInputStream dis = openDataStreamAndSkipRootObjectHeader(marshalled);
+        IgniteDataInput dis = openDataStreamAndSkipRootObjectHeader(marshalled);
 
         // beginning of array of NonFinalClass
         assertThat(ProtocolMarshalling.readDescriptorOrCommandId(dis), greaterThanOrEqualTo(MIN_CUSTOM_DESCRIPTOR_ID));
@@ -208,7 +207,7 @@ class DefaultUserObjectMarshallerConcreteTypesKnownUpfrontOptimizationTest {
     void writesNullsBitmapAndNoDescriptorIdForNonNullValueOfSimpleEnumField() throws Exception {
         MarshalledObject marshalled = marshaller.marshal(new WithSimpleEnumField(SimpleEnum.FIRST));
 
-        DataInputStream dis = openDataStreamAndSkipRootObjectHeader(marshalled);
+        IgniteDataInput dis = openDataStreamAndSkipRootObjectHeader(marshalled);
 
         skipOneByteEmptyNullBitMask(dis);
         assertThat(ProtocolMarshalling.readObjectId(dis), is(SECOND_USER_OBJECT_ID));
@@ -220,7 +219,7 @@ class DefaultUserObjectMarshallerConcreteTypesKnownUpfrontOptimizationTest {
     void writesNullsBitmapAndNoDescriptorIdForNonNullValueOfEnumWithAnonClassForMemberField() throws Exception {
         MarshalledObject marshalled = marshaller.marshal(new WithEnumWithAnonClassesForMembersField(EnumWithAnonClassesForMembers.FIRST));
 
-        DataInputStream dis = openDataStreamAndSkipRootObjectHeader(marshalled);
+        IgniteDataInput dis = openDataStreamAndSkipRootObjectHeader(marshalled);
 
         skipOneByteEmptyNullBitMask(dis);
         assertThat(ProtocolMarshalling.readObjectId(dis), is(SECOND_USER_OBJECT_ID));
@@ -232,7 +231,7 @@ class DefaultUserObjectMarshallerConcreteTypesKnownUpfrontOptimizationTest {
     void writesNullsBitmapAndNoDescriptorIdForNonNullSimpleEnumArrayElement() throws Exception {
         MarshalledObject marshalled = marshaller.marshal(new SimpleEnum[]{SimpleEnum.FIRST});
 
-        DataInputStream dis = openDataStream(marshalled);
+        IgniteDataInput dis = openDataStream(marshalled);
 
         assertThat(ProtocolMarshalling.readDescriptorOrCommandId(dis), greaterThanOrEqualTo(MIN_CUSTOM_DESCRIPTOR_ID));
         ProtocolMarshalling.readObjectId(dis);
@@ -249,7 +248,7 @@ class DefaultUserObjectMarshallerConcreteTypesKnownUpfrontOptimizationTest {
     void writesNullsBitmapAndNoDescriptorIdForNonNullEnumWithAnonClassForMemberArrayElement() throws Exception {
         MarshalledObject marshalled = marshaller.marshal(new EnumWithAnonClassesForMembers[]{EnumWithAnonClassesForMembers.FIRST});
 
-        DataInputStream dis = openDataStream(marshalled);
+        IgniteDataInput dis = openDataStream(marshalled);
 
         assertThat(ProtocolMarshalling.readDescriptorOrCommandId(dis), greaterThanOrEqualTo(MIN_CUSTOM_DESCRIPTOR_ID));
         ProtocolMarshalling.readObjectId(dis);
@@ -266,7 +265,7 @@ class DefaultUserObjectMarshallerConcreteTypesKnownUpfrontOptimizationTest {
     void writesFullDescriptorIdsForEnumsInAbstractEnumArray() throws Exception {
         MarshalledObject marshalled = marshaller.marshal(new Enum[]{SimpleEnum.FIRST});
 
-        DataInputStream dis = openDataStream(marshalled);
+        IgniteDataInput dis = openDataStream(marshalled);
 
         assertThat(ProtocolMarshalling.readDescriptorOrCommandId(dis), greaterThanOrEqualTo(MIN_CUSTOM_DESCRIPTOR_ID));
         ProtocolMarshalling.readObjectId(dis);
@@ -283,7 +282,7 @@ class DefaultUserObjectMarshallerConcreteTypesKnownUpfrontOptimizationTest {
     void supportsFinalClassOptimizationInPutFields() throws Exception {
         MarshalledObject marshalled = marshaller.marshal(new WithPutFieldsReadFields());
 
-        DataInputStream dis = openDataStreamAndSkipRootObjectHeader(marshalled);
+        IgniteDataInput dis = openDataStreamAndSkipRootObjectHeader(marshalled);
 
         skipOneByteEmptyNullBitMask(dis);
         assertThat(ProtocolMarshalling.readObjectId(dis), is(SECOND_USER_OBJECT_ID));
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/marshal/DefaultUserObjectMarshallerWithArbitraryObjectsTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/marshal/DefaultUserObjectMarshallerWithArbitraryObjectsTest.java
index 29e094f..26a9852 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/marshal/DefaultUserObjectMarshallerWithArbitraryObjectsTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/marshal/DefaultUserObjectMarshallerWithArbitraryObjectsTest.java
@@ -94,9 +94,9 @@ class DefaultUserObjectMarshallerWithArbitraryObjectsTest {
     void marshalsArbitraryObjectsUsingDescriptorsOfThemAndTheirContents() throws Exception {
         MarshalledObject marshalled = marshaller.marshal(new Simple(42));
 
-        assertThat(marshalled.usedDescriptors(), equalTo(Set.of(
-                descriptorRegistry.getRequiredDescriptor(Simple.class),
-                descriptorRegistry.getBuiltInDescriptor(BuiltInType.INT)
+        assertThat(marshalled.usedDescriptorIds(), equalTo(Set.of(
+                descriptorRegistry.getRequiredDescriptor(Simple.class).descriptorId(),
+                descriptorRegistry.getBuiltInDescriptor(BuiltInType.INT).descriptorId()
         )));
     }
 
@@ -132,9 +132,9 @@ class DefaultUserObjectMarshallerWithArbitraryObjectsTest {
     void usesDescriptorsOfAllAncestors() throws Exception {
         MarshalledObject marshalled = marshaller.marshal(new Child("answer", 42));
 
-        assertThat(marshalled.usedDescriptors(), hasItems(
-                descriptorRegistry.getRequiredDescriptor(Parent.class),
-                descriptorRegistry.getRequiredDescriptor(Child.class)
+        assertThat(marshalled.usedDescriptorIds(), hasItems(
+                descriptorRegistry.getRequiredDescriptor(Parent.class).descriptorId(),
+                descriptorRegistry.getRequiredDescriptor(Child.class).descriptorId()
         ));
     }
 
@@ -532,6 +532,27 @@ class DefaultUserObjectMarshallerWithArbitraryObjectsTest {
         return Thread.currentThread().getContextClassLoader();
     }
 
+    @Test
+    void asciiInStringFieldIsSupported() throws Exception {
+        WithStringField unmarshalled = marshalAndUnmarshalNonNull(new WithStringField("a"));
+
+        assertThat(unmarshalled.value, is("a"));
+    }
+
+    @Test
+    void nonAsciiLatin1InStringFieldIsSupported() throws Exception {
+        WithStringField unmarshalled = marshalAndUnmarshalNonNull(new WithStringField("é"));
+
+        assertThat(unmarshalled.value, is("é"));
+    }
+
+    @Test
+    void nonLatin1InStringFieldIsSupported() throws Exception {
+        WithStringField unmarshalled = marshalAndUnmarshalNonNull(new WithStringField("щ"));
+
+        assertThat(unmarshalled.value, is("щ"));
+    }
+
     private static boolean noArgs(Method method) {
         return method.getParameterTypes().length == 0;
     }
@@ -758,4 +779,12 @@ class DefaultUserObjectMarshallerWithArbitraryObjectsTest {
             throw new RuntimeException("Don't know how to handle " + method + " with args" + Arrays.toString(args));
         }
     }
+
+    private static class WithStringField {
+        private final String value;
+
+        private WithStringField(String value) {
+            this.value = value;
+        }
+    }
 }
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/marshal/DefaultUserObjectMarshallerWithBuiltinsTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/marshal/DefaultUserObjectMarshallerWithBuiltinsTest.java
index 0ea20fb..1665b8c 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/marshal/DefaultUserObjectMarshallerWithBuiltinsTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/marshal/DefaultUserObjectMarshallerWithBuiltinsTest.java
@@ -29,6 +29,7 @@ import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.sameInstance;
 
 import java.io.ByteArrayInputStream;
+import java.io.DataInput;
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.math.BigDecimal;
@@ -53,6 +54,8 @@ import org.apache.ignite.internal.network.serialization.BuiltInType;
 import org.apache.ignite.internal.network.serialization.ClassDescriptor;
 import org.apache.ignite.internal.network.serialization.ClassDescriptorFactory;
 import org.apache.ignite.internal.network.serialization.ClassDescriptorRegistry;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteUnsafeDataInput;
 import org.apache.ignite.lang.IgniteUuid;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -89,7 +92,8 @@ class DefaultUserObjectMarshallerWithBuiltinsTest {
     void marshalsBareObjectUsingOnlyBareObjectDescriptor() throws Exception {
         MarshalledObject marshalled = marshaller.marshal(new Object());
 
-        assertThat(marshalled.usedDescriptors(), equalTo(Set.of(descriptorRegistry.getRequiredDescriptor(Object.class))));
+        ClassDescriptor expectedDescriptor = descriptorRegistry.getRequiredDescriptor(Object.class);
+        assertThat(marshalled.usedDescriptorIds(), equalTo(Set.of(expectedDescriptor.descriptorId())));
     }
 
     @Test
@@ -109,10 +113,10 @@ class DefaultUserObjectMarshallerWithBuiltinsTest {
     void marshalsObjectArrayUsingExactlyDescriptorsOfObjectArrayAndComponents() throws Exception {
         MarshalledObject marshalled = marshaller.marshal(new Object[]{42, "abc"});
 
-        assertThat(marshalled.usedDescriptors(), containsInAnyOrder(
-                descriptorRegistry.getRequiredDescriptor(Object[].class),
-                descriptorRegistry.getRequiredDescriptor(Integer.class),
-                descriptorRegistry.getRequiredDescriptor(String.class)
+        assertThat(marshalled.usedDescriptorIds(), containsInAnyOrder(
+                descriptorRegistry.getRequiredDescriptor(Object[].class).descriptorId(),
+                descriptorRegistry.getRequiredDescriptor(Integer.class).descriptorId(),
+                BuiltInType.STRING_LATIN1.descriptorId()
         ));
     }
 
@@ -134,16 +138,16 @@ class DefaultUserObjectMarshallerWithBuiltinsTest {
     void marshalsSimpleEnumsUsingOnlyEnumClassDescriptor() throws Exception {
         MarshalledObject marshalled = marshaller.marshal(SimpleEnum.FIRST);
 
-        assertThat(marshalled.usedDescriptors(), equalTo(Set.of(descriptorRegistry.getRequiredDescriptor(SimpleEnum.class))));
+        ClassDescriptor expectedDescriptor = descriptorRegistry.getRequiredDescriptor(SimpleEnum.class);
+        assertThat(marshalled.usedDescriptorIds(), equalTo(Set.of(expectedDescriptor.descriptorId())));
     }
 
     @Test
     void marshalsEnumsWithAnonClassesForMembersUsingOnlyEnumClassDescriptor() throws Exception {
         MarshalledObject marshalled = marshaller.marshal(EnumWithAnonClassesForMembers.FIRST);
 
-        assertThat(marshalled.usedDescriptors(),
-                equalTo(Set.of(descriptorRegistry.getRequiredDescriptor(EnumWithAnonClassesForMembers.class)))
-        );
+        ClassDescriptor expectedDescriptor = descriptorRegistry.getRequiredDescriptor(EnumWithAnonClassesForMembers.class);
+        assertThat(marshalled.usedDescriptorIds(), equalTo(Set.of(expectedDescriptor.descriptorId())));
     }
 
     @Test
@@ -166,7 +170,7 @@ class DefaultUserObjectMarshallerWithBuiltinsTest {
     void marshalsSimpleEnumInCorrectFormat() throws Exception {
         MarshalledObject marshalled = marshaller.marshal(SimpleEnum.FIRST);
 
-        DataInputStream dis = new DataInputStream(new ByteArrayInputStream(marshalled.bytes()));
+        IgniteDataInput dis = openDataInput(marshalled);
 
         int descriptorId = ProtocolMarshalling.readDescriptorOrCommandId(dis);
         assertThat(descriptorRegistry.getRequiredDescriptor(descriptorId).clazz(), is(SimpleEnum.class));
@@ -174,13 +178,14 @@ class DefaultUserObjectMarshallerWithBuiltinsTest {
         ProtocolMarshalling.readObjectId(dis);
 
         assertThat(dis.readUTF(), is("FIRST"));
+        assertThat(dis.available(), is(0));
     }
 
     @Test
     void marshalsEnumWithAnonClassesForMembersInCorrectFormat() throws Exception {
         MarshalledObject marshalled = marshaller.marshal(EnumWithAnonClassesForMembers.FIRST);
 
-        DataInputStream dis = new DataInputStream(new ByteArrayInputStream(marshalled.bytes()));
+        IgniteDataInput dis = openDataInput(marshalled);
 
         int descriptorId = ProtocolMarshalling.readDescriptorOrCommandId(dis);
         assertThat(descriptorRegistry.getRequiredDescriptor(descriptorId).clazz(), is(EnumWithAnonClassesForMembers.class));
@@ -188,6 +193,7 @@ class DefaultUserObjectMarshallerWithBuiltinsTest {
         ProtocolMarshalling.readObjectId(dis);
 
         assertThat(dis.readUTF(), is("FIRST"));
+        assertThat(dis.available(), is(0));
     }
 
     @ParameterizedTest
@@ -210,7 +216,7 @@ class DefaultUserObjectMarshallerWithBuiltinsTest {
         MarshalledObject marshalled = marshaller.marshal(typeValue.value);
 
         ClassDescriptor expectedDescriptor = descriptorRegistry.getBuiltInDescriptor(typeValue.builtinType);
-        assertThat(marshalled.usedDescriptors(), equalTo(Set.of(expectedDescriptor)));
+        assertThat(marshalled.usedDescriptorIds(), equalTo(Set.of(expectedDescriptor.descriptorId())));
     }
 
     static Stream<Arguments> builtInNonCollectionTypes() {
@@ -224,7 +230,8 @@ class DefaultUserObjectMarshallerWithBuiltinsTest {
                 builtInTypeValue(true, BuiltInType.BOOLEAN_BOXED),
                 builtInTypeValue('a', BuiltInType.CHAR_BOXED),
                 // BARE_OBJECT is handled separately
-                builtInTypeValue("abc", BuiltInType.STRING),
+                builtInTypeValue("abc", BuiltInType.STRING_LATIN1),
+                builtInTypeValue("Привет", BuiltInType.STRING),
                 builtInTypeValue(UUID.fromString("c6f57d4a-619f-11ec-add6-73bc97c3c49e"), BuiltInType.UUID),
                 builtInTypeValue(IgniteUuid.fromString("1234-c6f57d4a-619f-11ec-add6-73bc97c3c49e"),
                         BuiltInType.IGNITE_UUID),
@@ -235,7 +242,8 @@ class DefaultUserObjectMarshallerWithBuiltinsTest {
                 builtInTypeValue(new float[]{1.0f, 2.0f, 3.0f}, BuiltInType.FLOAT_ARRAY),
                 builtInTypeValue(new long[]{1, 2, 3}, BuiltInType.LONG_ARRAY),
                 builtInTypeValue(new double[]{1.0, 2.0, 3.0}, BuiltInType.DOUBLE_ARRAY),
-                builtInTypeValue(new boolean[]{true, false}, BuiltInType.BOOLEAN_ARRAY),
+                builtInTypeValue(new boolean[]{true, false, true}, BuiltInType.BOOLEAN_ARRAY),
+                builtInTypeValue(new boolean[]{true, false, true, false, true, false, true, false, true}, BuiltInType.BOOLEAN_ARRAY),
                 builtInTypeValue(new char[]{'a', 'b'}, BuiltInType.CHAR_ARRAY),
                 builtInTypeValue(new BigDecimal(42), BuiltInType.DECIMAL),
                 builtInTypeValue(BitSet.valueOf(new long[]{42, 43}), BuiltInType.BIT_SET),
@@ -269,9 +277,9 @@ class DefaultUserObjectMarshallerWithBuiltinsTest {
     void marshalsUsingOnlyCorrespondingDescriptorsForBuiltInCollectionTypes(BuiltInTypeValue typeValue) throws Exception {
         MarshalledObject marshalled = marshaller.marshal(typeValue.value);
 
-        assertThat(marshalled.usedDescriptors(), containsInAnyOrder(
-                descriptorRegistry.getBuiltInDescriptor(typeValue.builtinType),
-                descriptorRegistry.getBuiltInDescriptor(BuiltInType.INT_BOXED)
+        assertThat(marshalled.usedDescriptorIds(), containsInAnyOrder(
+                typeValue.builtinType.descriptorId(),
+                BuiltInType.INT_BOXED.descriptorId()
         ));
     }
 
@@ -322,7 +330,7 @@ class DefaultUserObjectMarshallerWithBuiltinsTest {
 
         ClassDescriptor descriptor = descriptorRegistry.getRequiredDescriptor(array.getClass());
 
-        assertThat(marshalled.usedDescriptors(), hasItem(descriptor));
+        assertThat(marshalled.usedDescriptorIds(), hasItem(descriptor.descriptorId()));
     }
 
     @ParameterizedTest
@@ -410,7 +418,7 @@ class DefaultUserObjectMarshallerWithBuiltinsTest {
     void marshalsSimpleEnumArrayCorrectly() throws Exception {
         MarshalledObject marshalled = marshaller.marshal(new SimpleEnum[]{SimpleEnum.FIRST});
 
-        var dis = new DataInputStream(new ByteArrayInputStream(marshalled.bytes()));
+        IgniteDataInput dis = openDataInput(marshalled);
 
         ProtocolMarshalling.readDescriptorOrCommandId(dis);
         ProtocolMarshalling.readObjectId(dis);
@@ -428,7 +436,7 @@ class DefaultUserObjectMarshallerWithBuiltinsTest {
     void marshalsEnumArrayWithValuesOfSimpleEnumCorrectly() throws Exception {
         MarshalledObject marshalled = marshaller.marshal(new Enum[]{SimpleEnum.FIRST});
 
-        var dis = new DataInputStream(new ByteArrayInputStream(marshalled.bytes()));
+        IgniteDataInput dis = openDataInput(marshalled);
 
         ProtocolMarshalling.readDescriptorOrCommandId(dis);
         ProtocolMarshalling.readObjectId(dis);
@@ -446,7 +454,7 @@ class DefaultUserObjectMarshallerWithBuiltinsTest {
     void marshalsArrayOfEnumWithAnonClassesForMembersCorrectly() throws Exception {
         MarshalledObject marshalled = marshaller.marshal(new EnumWithAnonClassesForMembers[]{EnumWithAnonClassesForMembers.FIRST});
 
-        var dis = new DataInputStream(new ByteArrayInputStream(marshalled.bytes()));
+        IgniteDataInput dis = openDataInput(marshalled);
 
         ProtocolMarshalling.readDescriptorOrCommandId(dis);
         ProtocolMarshalling.readObjectId(dis);
@@ -460,7 +468,7 @@ class DefaultUserObjectMarshallerWithBuiltinsTest {
         assertThat(dis.available(), is(0));
     }
 
-    private void skipOneByteEmptyNullBitMask(DataInputStream dis) throws IOException {
+    private void skipOneByteEmptyNullBitMask(DataInput dis) throws IOException {
         assertThat(dis.readByte(), is((byte) 0));
     }
 
@@ -468,7 +476,7 @@ class DefaultUserObjectMarshallerWithBuiltinsTest {
     void marshalsEnumArrayWithValuesOfEnumWithAnonClassesForMembersValuesCorrectly() throws Exception {
         MarshalledObject marshalled = marshaller.marshal(new Enum[]{EnumWithAnonClassesForMembers.FIRST});
 
-        var dis = new DataInputStream(new ByteArrayInputStream(marshalled.bytes()));
+        IgniteDataInput dis = openDataInput(marshalled);
 
         ProtocolMarshalling.readDescriptorOrCommandId(dis);
         ProtocolMarshalling.readObjectId(dis);
@@ -486,7 +494,7 @@ class DefaultUserObjectMarshallerWithBuiltinsTest {
     void marshalsEmptyAbstractEnumArrayCorrectly() throws Exception {
         MarshalledObject marshalled = marshaller.marshal(new Enum[]{});
 
-        var dis = new DataInputStream(new ByteArrayInputStream(marshalled.bytes()));
+        IgniteDataInput dis = openDataInput(marshalled);
 
         ProtocolMarshalling.readDescriptorOrCommandId(dis);
         ProtocolMarshalling.readObjectId(dis);
@@ -497,11 +505,15 @@ class DefaultUserObjectMarshallerWithBuiltinsTest {
         assertThat(dis.available(), is(0));
     }
 
+    private IgniteDataInput openDataInput(MarshalledObject marshalled) {
+        return new IgniteUnsafeDataInput(marshalled.bytes());
+    }
+
     @Test
     void marshalsEmptySimpleEnumArrayCorrectly() throws Exception {
         MarshalledObject marshalled = marshaller.marshal(new SimpleEnum[]{});
 
-        var dis = new DataInputStream(new ByteArrayInputStream(marshalled.bytes()));
+        IgniteDataInput dis = openDataInput(marshalled);
 
         ProtocolMarshalling.readDescriptorOrCommandId(dis);
         ProtocolMarshalling.readObjectId(dis);
@@ -516,7 +528,7 @@ class DefaultUserObjectMarshallerWithBuiltinsTest {
     void marshalsEmptyArrayOfEnumWithAnonClassesForMembersCorrectly() throws Exception {
         MarshalledObject marshalled = marshaller.marshal(new EnumWithAnonClassesForMembers[]{});
 
-        var dis = new DataInputStream(new ByteArrayInputStream(marshalled.bytes()));
+        IgniteDataInput dis = openDataInput(marshalled);
 
         ProtocolMarshalling.readDescriptorOrCommandId(dis);
         ProtocolMarshalling.readObjectId(dis);
@@ -527,6 +539,22 @@ class DefaultUserObjectMarshallerWithBuiltinsTest {
         assertThat(dis.available(), is(0));
     }
 
+    @ParameterizedTest
+    @MethodSource("testStrings")
+    void marshalsAndUnmarshalsStrings(String str) throws Exception {
+        String unmarshalled = marshalAndUnmarshal(str);
+
+        assertThat(unmarshalled, is(equalTo(str)));
+    }
+
+    private static Stream<Arguments> testStrings() {
+        return Stream.of(
+                "ASCII",
+                "Not ASCII, but Latin-1: é",
+                "Ютиэф-8"
+        ).map(Arguments::of);
+    }
+
     private static class BuiltInTypeValue {
         private final Object value;
         private final BuiltInType builtinType;
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/marshal/DefaultUserObjectMarshallerWithExternalizableTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/marshal/DefaultUserObjectMarshallerWithExternalizableTest.java
index e7f33a0..e088eb1 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/marshal/DefaultUserObjectMarshallerWithExternalizableTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/marshal/DefaultUserObjectMarshallerWithExternalizableTest.java
@@ -62,9 +62,8 @@ class DefaultUserObjectMarshallerWithExternalizableTest {
     void usesExactlyOneDescriptorWhenMarshallingExternalizable() throws Exception {
         MarshalledObject marshalled = marshaller.marshal(new SimpleExternalizable(42));
 
-        ClassDescriptor expectedDescriptor = descriptorRegistry.getDescriptor(SimpleExternalizable.class);
-        assertThat(expectedDescriptor, is(notNullValue()));
-        assertThat(marshalled.usedDescriptors(), is(equalTo(Set.of(expectedDescriptor))));
+        ClassDescriptor expectedDescriptor = descriptorRegistry.getRequiredDescriptor(SimpleExternalizable.class);
+        assertThat(marshalled.usedDescriptorIds(), is(equalTo(Set.of(expectedDescriptor.descriptorId()))));
     }
 
     @Test
@@ -113,7 +112,7 @@ class DefaultUserObjectMarshallerWithExternalizableTest {
         MarshalledObject marshalled = marshaller.marshal(new ExternalizableWithReplaceWithSimple(42));
 
         ClassDescriptor replacementDescriptor = descriptorRegistry.getRequiredDescriptor(SimpleExternalizable.class);
-        assertThat(marshalled.usedDescriptors(), equalTo(Set.of(replacementDescriptor)));
+        assertThat(marshalled.usedDescriptorIds(), equalTo(Set.of(replacementDescriptor.descriptorId())));
     }
 
     @Test
@@ -130,7 +129,7 @@ class DefaultUserObjectMarshallerWithExternalizableTest {
         MarshalledObject marshalled = marshaller.marshal(new ExternalizableWithReplaceWithNull(42));
 
         ClassDescriptor replacementDescriptor = descriptorRegistry.getNullDescriptor();
-        assertThat(marshalled.usedDescriptors(), equalTo(Set.of(replacementDescriptor)));
+        assertThat(marshalled.usedDescriptorIds(), equalTo(Set.of(replacementDescriptor.descriptorId())));
     }
 
     @Test
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/marshal/DefaultUserObjectMarshallerWithSerializableOverrideStreamsTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/marshal/DefaultUserObjectMarshallerWithSerializableOverrideStreamsTest.java
index 916edaf..edd3c8a 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/marshal/DefaultUserObjectMarshallerWithSerializableOverrideStreamsTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/marshal/DefaultUserObjectMarshallerWithSerializableOverrideStreamsTest.java
@@ -34,10 +34,8 @@ import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
-import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInput;
-import java.io.DataInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InvalidObjectException;
@@ -50,6 +48,8 @@ import java.util.Objects;
 import java.util.stream.Stream;
 import org.apache.ignite.internal.network.serialization.ClassDescriptorFactory;
 import org.apache.ignite.internal.network.serialization.ClassDescriptorRegistry;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteUnsafeDataInput;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -124,7 +124,7 @@ class DefaultUserObjectMarshallerWithSerializableOverrideStreamsTest {
     }
 
     private byte[] readOverrideBytes(MarshalledObject marshalled) throws IOException {
-        DataInputStream dis = new DataInputStream(new ByteArrayInputStream(marshalled.bytes()));
+        IgniteDataInput dis = new IgniteUnsafeDataInput(marshalled.bytes());
 
         ProtocolMarshalling.readDescriptorOrCommandId(dis);
         ProtocolMarshalling.readObjectId(dis);
@@ -143,7 +143,7 @@ class DefaultUserObjectMarshallerWithSerializableOverrideStreamsTest {
                 new ReadWriteSpec<>("double", oos -> oos.writeDouble(42.0), DataInput::readDouble, 42.0),
                 new ReadWriteSpec<>("char", oos -> oos.writeChar('a'), DataInput::readChar, 'a'),
                 new ReadWriteSpec<>("boolean", oos -> oos.writeBoolean(true), DataInput::readBoolean, true),
-                new ReadWriteSpec<>("stream byte", oos -> oos.write(42), ObjectInputStream::read, DataInputStream::read, 42),
+                new ReadWriteSpec<>("stream byte", oos -> oos.write(42), ObjectInputStream::read, IgniteDataInput::read, 42),
                 new ReadWriteSpec<>(
                         "byte array",
                         oos -> oos.write(new byte[]{42, 43}),
@@ -197,7 +197,7 @@ class DefaultUserObjectMarshallerWithSerializableOverrideStreamsTest {
                         "readAllBytes",
                         oos -> oos.write(new byte[]{42, 43}),
                         InputStream::readAllBytes,
-                        InputStream::readAllBytes,
+                        IgniteDataInput::readAllBytes,
                         new byte[]{42, 43}
                 ),
                 new ReadWriteSpec<>(
@@ -226,6 +226,14 @@ class DefaultUserObjectMarshallerWithSerializableOverrideStreamsTest {
     }
 
     @SuppressWarnings("SameParameterValue")
+    private static byte[] readBytes(IgniteDataInput is, int count) throws IOException {
+        byte[] bytes = new byte[count];
+        int read = is.read(bytes);
+        assertThat(read, is(count));
+        return bytes;
+    }
+
+    @SuppressWarnings("SameParameterValue")
     private static byte[] readRange(InputStream is, int count) throws IOException {
         byte[] bytes = new byte[count];
         int read = is.read(bytes, 0, count);
@@ -233,6 +241,14 @@ class DefaultUserObjectMarshallerWithSerializableOverrideStreamsTest {
         return bytes;
     }
 
+    @SuppressWarnings("SameParameterValue")
+    private static byte[] readRange(IgniteDataInput is, int count) throws IOException {
+        byte[] bytes = new byte[count];
+        int read = is.read(bytes, 0, count);
+        assertThat(read, is(count));
+        return bytes;
+    }
+
     private static byte[] readBytesFully(DataInput is, int count) throws IOException {
         byte[] bytes = new byte[count];
         is.readFully(bytes);
@@ -253,9 +269,16 @@ class DefaultUserObjectMarshallerWithSerializableOverrideStreamsTest {
         return bytes;
     }
 
-    private static <T> T consumeAndUnmarshal(DataInputStream stream) throws IOException {
+    @SuppressWarnings("SameParameterValue")
+    private static byte[] readNumBytesRange(IgniteDataInput is, int count) throws IOException {
+        byte[] bytes = new byte[count];
+        is.readFewBytes(bytes, 0, count);
+        return bytes;
+    }
+
+    private static <T> T consumeAndUnmarshal(IgniteDataInput stream) throws IOException {
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        stream.transferTo(baos);
+        baos.write(stream.readAllBytes());
 
         try {
             return staticMarshaller.unmarshal(baos.toByteArray(), staticDescriptorRegistry);
@@ -585,13 +608,13 @@ class DefaultUserObjectMarshallerWithSerializableOverrideStreamsTest {
         MarshalledObject marshalled = marshaller.marshal(original);
 
         byte[] overrideBytes = readOverrideBytes(marshalled);
-        DataInputStream dis = new DataInputStream(new ByteArrayInputStream(overrideBytes));
+        var dis = new IgniteUnsafeDataInput(overrideBytes);
 
         assertThat(dis.readInt(), is(42));
         assertThatDrained(dis);
     }
 
-    private void assertThatDrained(DataInputStream dis) throws IOException {
+    private void assertThatDrained(InputStream dis) throws IOException {
         assertThat("Stream is not drained", dis.read(), is(lessThan(0)));
     }
 
@@ -630,7 +653,7 @@ class DefaultUserObjectMarshallerWithSerializableOverrideStreamsTest {
     }
 
     private interface DataReader<T> {
-        T readFrom(DataInputStream stream) throws IOException;
+        T readFrom(IgniteDataInput stream) throws IOException;
     }
 
     private interface InputReader<T> {
@@ -667,7 +690,7 @@ class DefaultUserObjectMarshallerWithSerializableOverrideStreamsTest {
         }
 
         private T parseOverrideValue(byte[] overrideBytes) throws IOException {
-            DataInputStream dis = new DataInputStream(new ByteArrayInputStream(overrideBytes));
+            IgniteDataInput dis = new IgniteUnsafeDataInput(overrideBytes);
             return dataReader.readFrom(dis);
         }
 
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/marshal/DefaultUserObjectMarshallerWithSerializableTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/marshal/DefaultUserObjectMarshallerWithSerializableTest.java
index 1541413..072b178 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/marshal/DefaultUserObjectMarshallerWithSerializableTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/marshal/DefaultUserObjectMarshallerWithSerializableTest.java
@@ -111,10 +111,10 @@ class DefaultUserObjectMarshallerWithSerializableTest {
         MarshalledObject marshalled = marshaller.marshal(new SerializableWithReplaceWithSimple(42));
 
         ClassDescriptor originalDescriptor = descriptorRegistry.getRequiredDescriptor(SerializableWithReplaceWithSimple.class);
-        assertThat(marshalled.usedDescriptors(), not(hasItem(originalDescriptor)));
+        assertThat(marshalled.usedDescriptorIds(), not(hasItem(originalDescriptor.descriptorId())));
 
         ClassDescriptor replacementDescriptor = descriptorRegistry.getRequiredDescriptor(SimpleSerializable.class);
-        assertThat(marshalled.usedDescriptors(), hasItem(replacementDescriptor));
+        assertThat(marshalled.usedDescriptorIds(), hasItem(replacementDescriptor.descriptorId()));
     }
 
     @Test
@@ -131,7 +131,7 @@ class DefaultUserObjectMarshallerWithSerializableTest {
         MarshalledObject marshalled = marshaller.marshal(new SerializableWithReplaceWithNull(42));
 
         ClassDescriptor replacementDescriptor = descriptorRegistry.getNullDescriptor();
-        assertThat(marshalled.usedDescriptors(), equalTo(Set.of(replacementDescriptor)));
+        assertThat(marshalled.usedDescriptorIds(), equalTo(Set.of(replacementDescriptor.descriptorId())));
     }
 
     @Test
diff --git a/parent/pom.xml b/parent/pom.xml
index 0d90766..2c2a1f9 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -86,6 +86,7 @@
         <msgpack.version>0.8.21</msgpack.version>
         <caffeine.version>3.0.4</caffeine.version>
         <fastutil.version>8.5.6</fastutil.version>
+        <kryo.version>4.0.1</kryo.version>
 
         <!-- Plugins versions -->
         <apache.rat.plugin.version>0.13</apache.rat.plugin.version>
@@ -593,6 +594,13 @@
                 <version>${jmh.framework.version}</version>
             </dependency>
 
+            <!-- We currently only use Kryo for benchmarking our User Object Serialization -->
+            <dependency>
+                <groupId>com.esotericsoftware</groupId>
+                <artifactId>kryo</artifactId>
+                <version>${kryo.version}</version>
+            </dependency>
+
             <dependency>
                 <groupId>org.ow2.asm</groupId>
                 <artifactId>asm</artifactId>