You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2021/06/09 19:36:15 UTC

[ignite-3] 03/15: Optimize serialized row size.

This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch ignite-14743
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 0d336b6ca459283d4184ceb2997b0a649bc392be
Author: Andrew Mashenkov <an...@gmail.com>
AuthorDate: Thu Jun 3 15:21:53 2021 +0300

    Optimize serialized row size.
---
 .../apache/ignite/internal/util/Constants.java}    |  37 +-
 .../apache/ignite/internal/schema/BinaryRow.java   |   8 +-
 .../ignite/internal/schema/ByteBufferRow.java      |  10 +-
 .../ignite/internal/schema/NativeTypeSpec.java     |   1 +
 .../org/apache/ignite/internal/schema/README.md    |   6 +-
 .../ignite/internal/schema/RowAssembler.java       | 605 ---------------------
 .../schema/marshaller/AbstractSerializer.java      |   4 +-
 .../internal/schema/marshaller/MarshallerUtil.java |   2 +-
 .../marshaller/asm/AsmSerializerGenerator.java     |  25 +-
 .../marshaller/reflection/FieldAccessor.java       |   4 +-
 .../marshaller/reflection/JavaSerializer.java      |  10 +-
 .../schema/marshaller/reflection/Marshaller.java   |   4 +-
 .../internal/schema/row/AbstractChunkWriter.java   | 228 ++++++++
 .../schema/{ => row}/ExpandableByteBuf.java        |  31 +-
 .../internal/schema/row/LongChunkWriter.java       |  78 +++
 .../ignite/internal/schema/{ => row}/Row.java      | 451 +++++++++------
 .../ignite/internal/schema/row/RowAssembler.java   | 485 +++++++++++++++++
 .../internal/schema/row/TinyChunkWriter.java       |  77 +++
 .../ignite/internal/schema/row/XXHash32.java       | 243 +++++++++
 .../internal/schema/ExpandableByteBufTest.java     |   1 +
 .../ignite/internal/schema/RowAssemblerTest.java   | 129 ++---
 .../org/apache/ignite/internal/schema/RowTest.java |  37 +-
 .../marshaller/reflection/FieldAccessorTest.java   |   4 +-
 .../ignite/distributed/ITDistributedTableTest.java |   8 +-
 .../internal/schema/marshaller/KVSerializer.java   |   2 +-
 .../schema/marshaller/RecordSerializer.java        |   2 +-
 .../schema/marshaller/TupleMarshaller.java         |   2 +-
 .../ignite/internal/table/KVBinaryViewImpl.java    |   2 +-
 .../apache/ignite/internal/table/KVViewImpl.java   |   2 +-
 .../ignite/internal/table/RecordViewImpl.java      |   2 +-
 .../ignite/internal/table/RowChunkAdapter.java     |   2 +-
 .../apache/ignite/internal/table/TableImpl.java    |   2 +-
 .../org/apache/ignite/internal/table/TableRow.java |   2 +-
 .../ignite/internal/table/TupleMarshallerImpl.java |  96 ++--
 .../raft/PartitionCommandListenerTest.java         |   8 +-
 35 files changed, 1597 insertions(+), 1013 deletions(-)

diff --git a/modules/table/src/main/java/org/apache/ignite/internal/schema/marshaller/KVSerializer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/Constants.java
similarity index 52%
copy from modules/table/src/main/java/org/apache/ignite/internal/schema/marshaller/KVSerializer.java
copy to modules/core/src/main/java/org/apache/ignite/internal/util/Constants.java
index 5c4f32d..cf24696 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/schema/marshaller/KVSerializer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/Constants.java
@@ -15,32 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.schema.marshaller;
-
-import org.apache.ignite.internal.schema.Row;
-import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
+package org.apache.ignite.internal.util;
 
 /**
- * Key-value serializer interface.
+ * Utility class with magic constants.
  */
-public interface KVSerializer<K, V> {
-    /**
-     * @param key Key object to serialize.
-     * @param val Value object to serialize.
-     * @return Table row with columns serialized from given key-value pair.
-     */
-    Row serialize(@NotNull K key, V val);
+public final class Constants {
+    /** Bytes in kilo-byte  (IEC 80000-13).. */
+    public static final int KiB =  1024;
+
+    /** Bytes in mega-byte (IEC 80000-13). */
+    public static final int MiB =  1024 * KiB;
 
-    /**
-     * @param row Table row.
-     * @return Deserialized key object.
-     */
-    @NotNull K deserializeKey(@NotNull Row row);
+    /** Bytes in giga-byte (IEC 80000-13). */
+    public static final int GiB =  1024 * MiB;
 
-    /**
-     * @param row Table row.
-     * @return Deserialized value object.
-     */
-    @Nullable V deserializeValue(@NotNull Row row);
+    /** Stub. */
+    private Constants() {
+        //Noop.
+    }
 }
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRow.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRow.java
index 74be9fb..83a73e0 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRow.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRow.java
@@ -157,13 +157,11 @@ public interface BinaryRow {
         /** Flag indicates value chunk omits varlen table. */
         public static final int OMIT_VAL_VARTBL_FLAG = 1 << 4;
 
+        /** Flag indicates key chunk is written in Tiny format. */
         public static final int KEY_TYNY_FORMAT = 1 << 5;
 
-        public static final int KEY_LARGE_ROW_FORMAT = 1 << 6;
-
-        public static final int VAL_TYNY_FORMAT = 1 << 7;
-
-        public static final int VAL_LARGE_FORMAT = 1 << 8;
+        /** Flag indicates value chunk is written in Tiny format. */
+        public static final int VAL_TYNY_FORMAT = 1 << 6;
 
         /** Stub. */
         private RowFlags() {
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java
index 9c9c81e..0f3b484 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java
@@ -132,9 +132,7 @@ public class ByteBufferRow implements BinaryRow {
         final short flags = readShort(FLAGS_FIELD_OFFSET);
 
         final int off = KEY_CHUNK_OFFSET;
-        final int len = (flags & RowFlags.KEY_LARGE_ROW_FORMAT) != 0 ? readInteger(off) :
-            (flags & RowFlags.KEY_TYNY_FORMAT) != 0 ? readByte(off) :
-                readShort(off);
+        final int len = (flags & RowFlags.KEY_TYNY_FORMAT) == 0 ? readInteger(off) : (readByte(off) & 0xFF);
 
         try {
             return buf.limit(off + len).position(off).slice();
@@ -150,11 +148,9 @@ public class ByteBufferRow implements BinaryRow {
         final short flags = readShort(FLAGS_FIELD_OFFSET);
 
         int off = KEY_CHUNK_OFFSET +
-            ((flags & RowFlags.KEY_LARGE_ROW_FORMAT) != 0 ? readInteger(KEY_CHUNK_OFFSET) :
-            (flags & RowFlags.KEY_TYNY_FORMAT) != 0 ? readByte(KEY_CHUNK_OFFSET) : readShort(KEY_CHUNK_OFFSET));
+            ((flags & RowFlags.KEY_TYNY_FORMAT) == 0 ? readInteger(KEY_CHUNK_OFFSET) : (readByte(KEY_CHUNK_OFFSET) & 0xFF));
 
-        int len = hasValue() ? (flags & RowFlags.VAL_LARGE_FORMAT) != 0 ? readInteger(off) :
-            (flags & RowFlags.VAL_TYNY_FORMAT) != 0 ? readByte(off) : readShort(off) : 0;
+        int len = hasValue() ? (flags & RowFlags.VAL_TYNY_FORMAT) == 0 ? readInteger(off) : (readByte(off) & 0xFF) : 0;
 
         try {
             return buf.limit(off + len).position(off).slice();
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/NativeTypeSpec.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/NativeTypeSpec.java
index 75ebd5b..f853766 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/NativeTypeSpec.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/NativeTypeSpec.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.schema;
 
 import java.util.BitSet;
+import org.apache.ignite.internal.schema.row.Row;
 import org.apache.ignite.internal.tostring.S;
 
 /**
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/README.md b/modules/schema/src/main/java/org/apache/ignite/internal/schema/README.md
index 1909628..10143c7 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/README.md
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/README.md
@@ -73,7 +73,7 @@ varsize columns multiplied by 2 (a single entry in the offsets table is 2 bytes)
 is calculated from the beginning of the chunk.
 
 ### Row construction and access
-To assemble a row with some schema, an instance of `org.apache.ignite.internal.schema.RowAssembler`
+To assemble a row with some schema, an instance of `org.apache.ignite.internal.schema.row.RowAssembler`
 must be used which provides the low-level API for building rows. When using the row assembler, the
 columns must be passed to the assembler in the internal schema sort order. Additionally, when constructing
 the instance of the assembler, the user should pre-calculate the size of the row to avoid extra array copies,
@@ -81,7 +81,7 @@ and the number of non-null varlen columns for key and value chunks. Less restric
 are provided by class (de)serializers and row builder, which take care of sizing and column order.
 
 To read column values of a row, one needs to construct a subclass of
-`org.apache.ignite.internal.schema.Row` which provides necessary logic to read arbitrary columns with
-type checking. For primitive types, `org.apache.ignite.internal.schema.Row` provides boxed and non-boxed
+`org.apache.ignite.internal.schema.row.Row` which provides necessary logic to read arbitrary columns with
+type checking. For primitive types, `org.apache.ignite.internal.schema.row.Row` provides boxed and non-boxed
 value methods to avoid boxing in scenarios where boxing can be avoided (deserialization of non-null columns to
 POJO primitives, for example).
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/RowAssembler.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/RowAssembler.java
deleted file mode 100644
index 021d72b..0000000
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/RowAssembler.java
+++ /dev/null
@@ -1,605 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.schema;
-
-import java.nio.charset.CharacterCodingException;
-import java.nio.charset.CharsetEncoder;
-import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
-import java.util.BitSet;
-import java.util.UUID;
-import org.apache.ignite.internal.schema.BinaryRow.RowFlags;
-
-import static org.apache.ignite.internal.schema.BinaryRow.RowFlags.OMIT_KEY_VARTBL_FLAG;
-import static org.apache.ignite.internal.schema.BinaryRow.RowFlags.OMIT_VAL_VARTBL_FLAG;
-import static org.apache.ignite.internal.schema.BinaryRow.VARLEN_COLUMN_OFFSET_FIELD_SIZE;
-import static org.apache.ignite.internal.schema.BinaryRow.VARLEN_TABLE_SIZE_FIELD_SIZE;
-
-/**
- * Utility class to build rows using column appending pattern. The external user of this class must consult
- * with the schema and provide the columns in strict internal column sort order during the row construction.
- * Additionally, the user of this class should pre-calculate the resulting row size when possible to avoid
- * unnecessary data copies. The assembler provides some utility methods to calculate the resulting row size
- * based on the number of null columns and size calculation for strings.
- *
- * @see #rowSize(Columns, int, int, Columns, int, int)
- * @see #rowChunkSize(Columns, int, int)
- * @see #utf8EncodedLength(CharSequence)
- */
-public class RowAssembler {
-    /** Schema. */
-    private final SchemaDescriptor schema;
-
-    /** The number of non-null varlen columns in values chunk. */
-    private final int nonNullVarlenValCols;
-
-    /** Target byte buffer to write to. */
-    private final ExpandableByteBuf buf;
-
-    private final int valOffSize;
-
-    /** Current columns chunk. */
-    private Columns curCols;
-
-    /** Current field index (the field is unset). */
-    private int curCol;
-
-    /** Index of the current varlen table entry. Incremented each time non-null varlen column is appended. */
-    private int curVarlenTblEntry;
-
-    /** Current offset for the next column to be appended. */
-    private int curOff;
-
-    /** Base offset of the current chunk */
-    private int baseOff;
-
-    /** Offset of the null map for current chunk. */
-    private int nullMapOff;
-
-    /** Offset of the varlen table for current chunk. */
-    private int varlenTblChunkOff;
-
-    private int adaptiveItemSize;
-
-    /** Row hashcode. */
-    private int keyHash;
-
-    /** Flags. */
-    private short flags;
-
-    /** Charset encoder for strings. Initialized lazily. */
-    private CharsetEncoder strEncoder;
-
-    /**
-     * @param nonNullVarlenCols Number of non-null varlen columns.
-     * @return Total size of the varlen table.
-     */
-    public static int varlenTableChunkSize(int nonNullVarlenCols) {
-        return nonNullVarlenCols == 0 ? 0 :
-            VARLEN_TABLE_SIZE_FIELD_SIZE + nonNullVarlenCols * VARLEN_COLUMN_OFFSET_FIELD_SIZE;
-    }
-
-    /**
-     * Calculates encoded string length.
-     *
-     * @param seq Char sequence.
-     * @return Encoded string length.
-     * @implNote This implementation is not tolerant to malformed char sequences.
-     */
-    public static int utf8EncodedLength(CharSequence seq) {
-        int cnt = 0;
-
-        for (int i = 0, len = seq.length(); i < len; i++) {
-            char ch = seq.charAt(i);
-
-            if (ch <= 0x7F)
-                cnt++;
-            else if (ch <= 0x7FF)
-                cnt += 2;
-            else if (Character.isHighSurrogate(ch)) {
-                cnt += 4;
-                ++i;
-            }
-            else
-                cnt += 3;
-        }
-
-        return cnt;
-    }
-
-    /**
-     * @param keyCols Key columns.
-     * @param nonNullVarlenKeyCols Number of non-null varlen columns in key chunk.
-     * @param nonNullVarlenKeySize Size of non-null varlen columns in key chunk.
-     * @param valCols Value columns.
-     * @param nonNullVarlenValCols Number of non-null varlen columns in value chunk.
-     * @param nonNullVarlenValSize Size of non-null varlen columns in value chunk.
-     * @return Total row size.
-     */
-    public static int rowSize(
-        Columns keyCols,
-        int nonNullVarlenKeyCols,
-        int nonNullVarlenKeySize,
-        Columns valCols,
-        int nonNullVarlenValCols,
-        int nonNullVarlenValSize
-    ) {
-        return BinaryRow.KEY_CHUNK_OFFSET /* Header size */ +
-            rowChunkSize(keyCols, nonNullVarlenKeyCols, nonNullVarlenKeySize) +
-            rowChunkSize(valCols, nonNullVarlenValCols, nonNullVarlenValSize);
-    }
-
-    /**
-     * @param cols Columns.
-     * @param nonNullVarlenCols Number of non-null varlen columns in chunk.
-     * @param nonNullVarlenSize Size of non-null varlen columns in chunk.
-     * @return Row's chunk size.
-     */
-    static int rowChunkSize(Columns cols, int nonNullVarlenCols, int nonNullVarlenSize) {
-        int size = BinaryRow.CHUNK_LEN_FIELD_SIZE + cols.nullMapSize() +
-            varlenTableChunkSize(nonNullVarlenCols);
-
-        for (int i = 0; i < cols.numberOfFixsizeColumns(); i++)
-            size += cols.column(i).type().sizeInBytes();
-
-        return size + nonNullVarlenSize;
-    }
-    public RowAssembler(
-        SchemaDescriptor schema,
-        int size,
-        int nonNullVarlenKeyCols,
-        int nonNullVarlenValCols
-    ) {
-        this(schema, size, 2, nonNullVarlenKeyCols, 2, nonNullVarlenValCols);
-    }
-
-    /**
-     * @param schema Row schema.
-     * @param size Target row size. If the row size is known in advance, it should be provided upfront to avoid
-     * unnecessary arrays copy.
-     * @param nonNullVarlenKeyCols Number of non-null varlen columns in key chunk.
-     * @param nonNullVarlenValCols Number of non-null varlen columns in value chunk.
-     */
-    public RowAssembler(
-        SchemaDescriptor schema,
-        int size,
-        int keyOffSize,
-        int nonNullVarlenKeyCols,
-        int valOffSize,
-        int nonNullVarlenValCols
-    ) {
-        this.schema = schema;
-        this.nonNullVarlenValCols = nonNullVarlenValCols;
-        this.valOffSize = valOffSize;
-
-        curCols = schema.keyColumns();
-        flags = 0;
-        keyHash = 0;
-        strEncoder = null;
-
-        initOffsets(BinaryRow.KEY_CHUNK_OFFSET, keyOffSize, nonNullVarlenKeyCols);
-
-        if (schema.keyColumns().nullMapSize() == 0)
-            flags |= RowFlags.OMIT_KEY_NULL_MAP_FLAG;
-
-        if (schema.valueColumns().nullMapSize() == 0)
-            flags |= RowFlags.OMIT_VAL_NULL_MAP_FLAG;
-
-        if (keyOffSize == 1)
-            flags |= RowFlags.KEY_TYNY_FORMAT;
-        else if (keyOffSize != 2)
-            flags |= RowFlags.KEY_LARGE_ROW_FORMAT;
-
-        if (valOffSize == 1)
-            flags |= RowFlags.VAL_TYNY_FORMAT;
-        else if (valOffSize != 2)
-            flags |= RowFlags.VAL_LARGE_FORMAT;
-
-        buf = new ExpandableByteBuf(size);
-
-        buf.putShort(0, (short)schema.version());
-
-        if (nonNullVarlenKeyCols == 0)
-            flags |= OMIT_KEY_VARTBL_FLAG;
-        else
-            writeAdaptiveItem(varlenTblChunkOff, (short)nonNullVarlenKeyCols);
-    }
-
-    public static int vartableOffSize(int vartableItems, int dataSize) {
-        if (dataSize + vartableItems + 2 < (1 << 8))
-            return 1;
-        if (dataSize + (vartableItems + 2) * 2 < (1 << 16))
-            return 2;
-        else
-            return 4;
-    }
-
-    /**
-     * Appends {@code null} value for the current column to the chunk.
-     */
-    public void appendNull() {
-        Column col = curCols.column(curCol);
-
-        if (!col.nullable())
-            throw new IllegalArgumentException("Failed to set column (null was passed, but column is not nullable): " +
-                col);
-
-        setNull(curCol);
-
-        if (isKeyColumn())
-            keyHash *= 31;
-
-        shiftColumn(0, false);
-    }
-
-    /**
-     * Appends byte value for the current column to the chunk.
-     *
-     * @param val Column value.
-     */
-    public void appendByte(byte val) {
-        checkType(NativeTypes.BYTE);
-
-        buf.put(curOff, val);
-
-        if (isKeyColumn())
-            keyHash = 31 * keyHash + Byte.hashCode(val);
-
-        shiftColumn(NativeTypes.BYTE);
-    }
-
-    /**
-     * Appends short value for the current column to the chunk.
-     *
-     * @param val Column value.
-     */
-    public void appendShort(short val) {
-        checkType(NativeTypes.SHORT);
-
-        buf.putShort(curOff, val);
-
-        if (isKeyColumn())
-            keyHash = 31 * keyHash + Short.hashCode(val);
-
-        shiftColumn(NativeTypes.SHORT);
-    }
-
-    /**
-     * Appends int value for the current column to the chunk.
-     *
-     * @param val Column value.
-     */
-    public void appendInt(int val) {
-        checkType(NativeTypes.INTEGER);
-
-        buf.putInt(curOff, val);
-
-        if (isKeyColumn())
-            keyHash = 31 * keyHash + Integer.hashCode(val);
-
-        shiftColumn(NativeTypes.INTEGER);
-    }
-
-    /**
-     * Appends long value for the current column to the chunk.
-     *
-     * @param val Column value.
-     */
-    public void appendLong(long val) {
-        checkType(NativeTypes.LONG);
-
-        buf.putLong(curOff, val);
-
-        if (isKeyColumn())
-            keyHash += 31 * keyHash + Long.hashCode(val);
-
-        shiftColumn(NativeTypes.LONG);
-    }
-
-    /**
-     * Appends float value for the current column to the chunk.
-     *
-     * @param val Column value.
-     */
-    public void appendFloat(float val) {
-        checkType(NativeTypes.FLOAT);
-
-        buf.putFloat(curOff, val);
-
-        if (isKeyColumn())
-            keyHash += 31 * keyHash + Float.hashCode(val);
-
-        shiftColumn(NativeTypes.FLOAT);
-    }
-
-    /**
-     * Appends double value for the current column to the chunk.
-     *
-     * @param val Column value.
-     */
-    public void appendDouble(double val) {
-        checkType(NativeTypes.DOUBLE);
-
-        buf.putDouble(curOff, val);
-
-        if (isKeyColumn())
-            keyHash += 31 * keyHash + Double.hashCode(val);
-
-        shiftColumn(NativeTypes.DOUBLE);
-    }
-
-    /**
-     * Appends UUID value for the current column to the chunk.
-     *
-     * @param uuid Column value.
-     */
-    public void appendUuid(UUID uuid) {
-        checkType(NativeTypes.UUID);
-
-        buf.putLong(curOff, uuid.getLeastSignificantBits());
-        buf.putLong(curOff + 8, uuid.getMostSignificantBits());
-
-        if (isKeyColumn())
-            keyHash += 31 * keyHash + uuid.hashCode();
-
-        shiftColumn(NativeTypes.UUID);
-    }
-
-    /**
-     * Appends String value for the current column to the chunk.
-     *
-     * @param val Column value.
-     */
-    public void appendString(String val) {
-        checkType(NativeTypes.STRING);
-
-        try {
-            int written = buf.putString(curOff, val, encoder());
-
-            writeOffset(curVarlenTblEntry, curOff - baseOff);
-
-            if (isKeyColumn())
-                keyHash += 31 * keyHash + val.hashCode();
-
-            shiftColumn(written, true);
-        }
-        catch (CharacterCodingException e) {
-            throw new AssemblyException("Failed to encode string", e);
-        }
-    }
-
-    /**
-     * Appends byte[] value for the current column to the chunk.
-     *
-     * @param val Column value.
-     */
-    public void appendBytes(byte[] val) {
-        checkType(NativeTypes.BYTES);
-
-        buf.putBytes(curOff, val);
-
-        if (isKeyColumn())
-            keyHash += 31 * keyHash + Arrays.hashCode(val);
-
-        writeOffset(curVarlenTblEntry, curOff - baseOff);
-
-        shiftColumn(val.length, true);
-    }
-
-    /**
-     * Appends BitSet value for the current column to the chunk.
-     *
-     * @param bitSet Column value.
-     */
-    public void appendBitmask(BitSet bitSet) {
-        Column col = curCols.column(curCol);
-
-        checkType(NativeTypeSpec.BITMASK);
-
-        BitmaskNativeType maskType = (BitmaskNativeType)col.type();
-
-        if (bitSet.length() > maskType.bits())
-            throw new IllegalArgumentException("Failed to set bitmask for column '" + col.name() + "' " +
-                "(mask size exceeds allocated size) [mask=" + bitSet + ", maxSize=" + maskType.bits() + "]");
-
-        byte[] arr = bitSet.toByteArray();
-
-        buf.putBytes(curOff, arr);
-
-        for (int i = 0; i < maskType.sizeInBytes() - arr.length; i++)
-            buf.put(curOff + arr.length + i, (byte)0);
-
-        if (isKeyColumn())
-            keyHash += 31 * keyHash + Arrays.hashCode(arr);
-
-        shiftColumn(maskType);
-    }
-
-    /**
-     * @return Serialized row.
-     */
-    public byte[] build() {
-        if (schema.keyColumns() == curCols)
-            throw new AssemblyException("Key column missed: colIdx=" + curCol);
-        else {
-            if (curCol == 0)
-                flags |= RowFlags.NO_VALUE_FLAG;
-            else if (schema.valueColumns().length() != curCol)
-                throw new AssemblyException("Value column missed: colIdx=" + curCol);
-        }
-
-        buf.putShort(BinaryRow.FLAGS_FIELD_OFFSET, flags);
-        buf.putInt(BinaryRow.KEY_HASH_FIELD_OFFSET, keyHash);
-
-        return buf.toArray();
-    }
-
-    /**
-     * @return UTF-8 string encoder.
-     */
-    private CharsetEncoder encoder() {
-        if (strEncoder == null)
-            strEncoder = StandardCharsets.UTF_8.newEncoder();
-
-        return strEncoder;
-    }
-
-    /**
-     * Writes the given offset to the varlen table entry with the given index.
-     *
-     * @param tblEntryIdx Varlen table entry index.
-     * @param off Offset to write.
-     */
-    private void writeOffset(int tblEntryIdx, int off) {
-        assert (flags & (baseOff == BinaryRow.KEY_CHUNK_OFFSET ? OMIT_KEY_VARTBL_FLAG : OMIT_VAL_VARTBL_FLAG)) == 0 :
-            "Illegal writing of varlen when 'omit vartable' flag is set for a chunk.";
-
-        writeAdaptiveItem(varlenTblChunkOff + vartableItemOffset(tblEntryIdx), off);
-    }
-
-    private void writeAdaptiveItem(int off, int val) {
-        switch (adaptiveItemSize) {
-            case Byte.BYTES:
-                buf.put(off, (byte)val);
-
-                return;
-            case Short.BYTES:
-                buf.putShort(off, (short)val);
-
-                return;
-            default:
-                buf.putInt(off, val);
-        }
-    }
-
-    /**
-     * Checks that the type being appended matches the column type.
-     *
-     * @param type Type spec that is attempted to be appended.
-     */
-    private void checkType(NativeTypeSpec type) {
-        Column col = curCols.column(curCol);
-
-        if (col.type().spec() != type)
-            throw new IllegalArgumentException("Failed to set column (int was passed, but column is of different " +
-                "type): " + col);
-    }
-
-    /**
-     * Checks that the type being appended matches the column type.
-     *
-     * @param type Type that is attempted to be appended.
-     */
-    private void checkType(NativeType type) {
-        checkType(type.spec());
-    }
-
-    /**
-     * Sets null flag in the null map for the given column.
-     *
-     * @param colIdx Column index.
-     */
-    private void setNull(int colIdx) {
-        assert (flags & (baseOff == BinaryRow.KEY_CHUNK_OFFSET ? RowFlags.OMIT_KEY_NULL_MAP_FLAG : RowFlags.OMIT_VAL_NULL_MAP_FLAG)) == 0 :
-            "Illegal writing 'null' value when 'omit null-map' flag is set for a chunk.";
-
-        int byteInMap = colIdx / 8;
-        int bitInByte = colIdx % 8;
-
-        buf.ensureCapacity(nullMapOff + byteInMap + 1);
-
-        buf.put(nullMapOff + byteInMap, (byte)(buf.get(nullMapOff + byteInMap) | (1 << bitInByte)));
-    }
-
-    /**
-     * Must be called after an append of fixlen column.
-     *
-     * @param type Type of the appended column.
-     */
-    private void shiftColumn(NativeType type) {
-        assert type.spec().fixedLength() : "Varlen types should provide field length to shift column: " + type;
-
-        shiftColumn(type.sizeInBytes(), false);
-    }
-
-    /**
-     * Shifts current offsets and column indexes as necessary, also changes the chunk base offsets when
-     * moving from key to value columns.
-     *
-     * @param size Size of the appended column.
-     * @param varlen {@code true} if appended column was varlen.
-     */
-    private void shiftColumn(int size, boolean varlen) {
-        curCol++;
-        curOff += size;
-
-        if (varlen)
-            curVarlenTblEntry++;
-
-        if (curCol == curCols.length()) {
-            int chunkLen = curOff - baseOff;
-
-            writeAdaptiveItem(baseOff, chunkLen);
-
-            if (schema.valueColumns() == curCols)
-                return; // No more columns.
-
-            curCols = schema.valueColumns(); // Switch key->value columns.
-
-            initOffsets(baseOff + chunkLen, valOffSize, nonNullVarlenValCols);
-
-            if (nonNullVarlenValCols == 0)
-                flags |= OMIT_VAL_VARTBL_FLAG;
-            else
-                writeAdaptiveItem(varlenTblChunkOff, (short)nonNullVarlenValCols);
-        }
-    }
-
-    /**
-     * @param base Chunk base offset.
-     * @param nonNullVarlenCols Number of non-null varlen columns.
-     */
-    private void initOffsets(int base, int adaptiveItemSize, int nonNullVarlenCols) {
-        baseOff = base;
-
-        this.adaptiveItemSize = adaptiveItemSize;
-
-        curCol = 0;
-        curVarlenTblEntry = 0;
-
-        nullMapOff = baseOff + adaptiveItemSize /* chunk length. */;
-        varlenTblChunkOff = nullMapOff + curCols.nullMapSize();
-
-        curOff = varlenTblChunkOff + vartableSize(nonNullVarlenCols, adaptiveItemSize);
-    }
-
-    public int vartableItemOffset(int nonNullVarlenCols) {
-        return adaptiveItemSize + nonNullVarlenCols * adaptiveItemSize;
-    }
-
-    public static int vartableSize(int items, int itemSize) {
-        return items == 0 ? 0 : itemSize + items * itemSize;
-    }
-
-    /**
-     * @return {@code true} if current column is a key column, {@code false} otherwise.
-     */
-    private boolean isKeyColumn() {
-        return schema.keyColumns() == curCols;
-    }
-}
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/AbstractSerializer.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/AbstractSerializer.java
index 0cd67ad..940e80a 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/AbstractSerializer.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/AbstractSerializer.java
@@ -19,8 +19,8 @@ package org.apache.ignite.internal.schema.marshaller;
 
 import java.util.Objects;
 import org.apache.ignite.internal.schema.ByteBufferRow;
-import org.apache.ignite.internal.schema.Row;
-import org.apache.ignite.internal.schema.RowAssembler;
+import org.apache.ignite.internal.schema.row.Row;
+import org.apache.ignite.internal.schema.row.RowAssembler;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.util.Pair;
 import org.jetbrains.annotations.Nullable;
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/MarshallerUtil.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/MarshallerUtil.java
index 6884d8f..265c3f9 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/MarshallerUtil.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/MarshallerUtil.java
@@ -21,7 +21,7 @@ import java.util.BitSet;
 import java.util.UUID;
 import org.apache.ignite.internal.schema.InvalidTypeException;
 import org.apache.ignite.internal.schema.NativeType;
-import org.apache.ignite.internal.schema.RowAssembler;
+import org.apache.ignite.internal.schema.row.RowAssembler;
 import org.apache.ignite.internal.util.ObjectFactory;
 
 /**
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/asm/AsmSerializerGenerator.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/asm/AsmSerializerGenerator.java
index 14712b3..d4e2425 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/asm/AsmSerializerGenerator.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/asm/AsmSerializerGenerator.java
@@ -40,8 +40,8 @@ import jdk.jfr.Experimental;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.Columns;
 import org.apache.ignite.internal.schema.NativeType;
-import org.apache.ignite.internal.schema.Row;
-import org.apache.ignite.internal.schema.RowAssembler;
+import org.apache.ignite.internal.schema.row.Row;
+import org.apache.ignite.internal.schema.row.RowAssembler;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.marshaller.AbstractSerializer;
 import org.apache.ignite.internal.schema.marshaller.BinaryMode;
@@ -246,8 +246,6 @@ public class AsmSerializerGenerator implements SerializerFactory {
 
         final Variable varlenKeyCols = scope.declareVariable("varlenKeyCols", body, BytecodeExpressions.defaultValue(int.class));
         final Variable varlenValueCols = scope.declareVariable("varlenValueCols", body, BytecodeExpressions.defaultValue(int.class));
-        final Variable varlenKeyColsSize = scope.declareVariable("varlenKeyColsSize", body, BytecodeExpressions.defaultValue(int.class));
-        final Variable varlenValueColsSize = scope.declareVariable("varlenValueColsSize", body, BytecodeExpressions.defaultValue(int.class));
 
         final Variable keyCols = scope.declareVariable(Columns.class, "keyCols");
         final Variable valCols = scope.declareVariable(Columns.class, "valCols");
@@ -268,13 +266,7 @@ public class AsmSerializerGenerator implements SerializerFactory {
 
                 body.append(keyMarsh.getValue(classDef.getType(), scope.getVariable("key"), i)).putVariable(tmp);
                 body.append(new IfStatement().condition(BytecodeExpressions.isNotNull(tmp)).ifTrue(
-                    new BytecodeBlock()
-                        .append(varlenKeyCols.increment())
-                        .append(BytecodeExpressions.add(
-                            varlenKeyColsSize,
-                            getColumnValueSize(tmp, keyCols, i))
-                        )
-                        .putVariable(varlenKeyColsSize))
+                    new BytecodeBlock().append(varlenKeyCols.increment()))
                 );
             }
         }
@@ -288,22 +280,13 @@ public class AsmSerializerGenerator implements SerializerFactory {
 
                 body.append(valMarsh.getValue(classDef.getType(), scope.getVariable("val"), i)).putVariable(tmp);
                 body.append(new IfStatement().condition(BytecodeExpressions.isNotNull(tmp)).ifTrue(
-                    new BytecodeBlock()
-                        .append(varlenValueCols.increment())
-                        .append(BytecodeExpressions.add(
-                            varlenValueColsSize,
-                            getColumnValueSize(tmp, valCols, i))
-                        )
-                        .putVariable(varlenValueColsSize))
+                    new BytecodeBlock().append(varlenValueCols.increment()))
                 );
             }
         }
 
         body.append(BytecodeExpressions.newInstance(RowAssembler.class,
             methodDef.getThis().getField("schema", SchemaDescriptor.class),
-            BytecodeExpressions.invokeStatic(RowAssembler.class, "rowSize", int.class,
-                keyCols, varlenKeyCols, varlenKeyColsSize,
-                valCols, varlenValueCols, varlenValueColsSize),
             varlenKeyCols,
             varlenValueCols));
 
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/FieldAccessor.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/FieldAccessor.java
index 017b7be..ce16e7d 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/FieldAccessor.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/FieldAccessor.java
@@ -25,8 +25,8 @@ import java.util.Objects;
 import java.util.UUID;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.Columns;
-import org.apache.ignite.internal.schema.Row;
-import org.apache.ignite.internal.schema.RowAssembler;
+import org.apache.ignite.internal.schema.row.Row;
+import org.apache.ignite.internal.schema.row.RowAssembler;
 import org.apache.ignite.internal.schema.marshaller.BinaryMode;
 import org.apache.ignite.internal.schema.marshaller.MarshallerUtil;
 import org.apache.ignite.internal.schema.marshaller.SerializationException;
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/JavaSerializer.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/JavaSerializer.java
index a36ef94..9e98d30 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/JavaSerializer.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/JavaSerializer.java
@@ -18,8 +18,8 @@
 package org.apache.ignite.internal.schema.marshaller.reflection;
 
 import org.apache.ignite.internal.schema.Columns;
-import org.apache.ignite.internal.schema.Row;
-import org.apache.ignite.internal.schema.RowAssembler;
+import org.apache.ignite.internal.schema.row.Row;
+import org.apache.ignite.internal.schema.row.RowAssembler;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.marshaller.AbstractSerializer;
 import org.apache.ignite.internal.schema.marshaller.SerializationException;
@@ -85,11 +85,7 @@ public class JavaSerializer extends AbstractSerializer {
         ObjectStatistic keyStat = collectObjectStats(schema.keyColumns(), keyMarsh, key);
         ObjectStatistic valStat = collectObjectStats(schema.valueColumns(), valMarsh, val);
 
-        int size = RowAssembler.rowSize(
-            schema.keyColumns(), keyStat.nonNullCols, keyStat.nonNullColsSize,
-            schema.valueColumns(), valStat.nonNullCols, valStat.nonNullColsSize);
-
-        return new RowAssembler(schema, size, 2, keyStat.nonNullCols, 2, valStat.nonNullCols);
+        return new RowAssembler(schema, keyStat.nonNullCols, valStat.nonNullCols);
     }
 
     /**
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/Marshaller.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/Marshaller.java
index 874ace6..e59f438 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/Marshaller.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/Marshaller.java
@@ -20,8 +20,8 @@ package org.apache.ignite.internal.schema.marshaller.reflection;
 import java.util.Objects;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.Columns;
-import org.apache.ignite.internal.schema.Row;
-import org.apache.ignite.internal.schema.RowAssembler;
+import org.apache.ignite.internal.schema.row.Row;
+import org.apache.ignite.internal.schema.row.RowAssembler;
 import org.apache.ignite.internal.schema.marshaller.BinaryMode;
 import org.apache.ignite.internal.schema.marshaller.MarshallerUtil;
 import org.apache.ignite.internal.schema.marshaller.SerializationException;
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/AbstractChunkWriter.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/AbstractChunkWriter.java
new file mode 100644
index 0000000..7ab7787
--- /dev/null
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/AbstractChunkWriter.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schema.row;
+
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.CharsetEncoder;
+import java.util.BitSet;
+import java.util.UUID;
+import org.apache.ignite.internal.schema.AssemblyException;
+import org.apache.ignite.internal.schema.NativeTypes;
+
+/**
+ * Abstract row chunk writer.
+ */
+abstract class AbstractChunkWriter {
+    /** Chunk buffer. */
+    protected final ExpandableByteBuf buf;
+
+    /** Base offset of the chunk */
+    protected final int baseOff;
+
+    /** Offset of the null map for the chunk. */
+    protected final int nullMapOff;
+
+    /** Offset of the varlen table for the chunk. */
+    protected final int varTblOff;
+
+    /** Offset of data for the chunk. */
+    protected final int dataOff;
+
+    /** Index of the current varlen table entry. Incremented each time non-null varlen column is appended. */
+    protected int curVartblItem;
+
+    /** Current offset for the next column to be appended. */
+    protected int curOff;
+
+    /**
+     * @param buf Row buffer.
+     * @param baseOff Chunk base offset.
+     * @param nullMapOff Null-map offset.
+     * @param varTblOff Vartable offset.
+     */
+    protected AbstractChunkWriter(ExpandableByteBuf buf, int baseOff, int nullMapOff, int varTblOff, int dataOff) {
+        this.buf = buf;
+        this.baseOff = baseOff;
+        this.nullMapOff = nullMapOff;
+        this.varTblOff = varTblOff;
+        this.dataOff = dataOff;
+        curOff = dataOff;
+        curVartblItem = 0;
+    }
+
+    /**
+     * Appends byte value for the current column to the chunk.
+     *
+     * @param val Column value.
+     */
+    public void appendByte(byte val) {
+        buf.put(curOff, val);
+
+        curOff += NativeTypes.BYTE.sizeInBytes();
+    }
+
+    /**
+     * Appends short value for the current column to the chunk.
+     *
+     * @param val Column value.
+     */
+    public void appendShort(short val) {
+        buf.putShort(curOff, val);
+
+        curOff += NativeTypes.SHORT.sizeInBytes();
+    }
+
+    /**
+     * Appends int value for the current column to the chunk.
+     *
+     * @param val Column value.
+     */
+    public void appendInt(int val) {
+        buf.putInt(curOff, val);
+
+        curOff += NativeTypes.INTEGER.sizeInBytes();
+    }
+
+    /**
+     * Appends long value for the current column to the chunk.
+     *
+     * @param val Column value.
+     */
+    public void appendLong(long val) {
+        buf.putLong(curOff, val);
+
+        curOff += NativeTypes.LONG.sizeInBytes();
+    }
+
+    /**
+     * Appends float value for the current column to the chunk.
+     *
+     * @param val Column value.
+     */
+    public void appendFloat(float val) {
+        buf.putFloat(curOff, val);
+
+        curOff += NativeTypes.FLOAT.sizeInBytes();
+    }
+
+    /**
+     * Appends double value for the current column to the chunk.
+     *
+     * @param val Column value.
+     */
+    public void appendDouble(double val) {
+        buf.putDouble(curOff, val);
+
+        curOff += NativeTypes.DOUBLE.sizeInBytes();
+    }
+
+    /**
+     * Appends UUID value for the current column to the chunk.
+     *
+     * @param uuid Column value.
+     */
+    public void appendUuid(UUID uuid) {
+        buf.putLong(curOff, uuid.getLeastSignificantBits());
+        buf.putLong(curOff + 8, uuid.getMostSignificantBits());
+
+        curOff += NativeTypes.UUID.sizeInBytes();
+    }
+
+    /**
+     * Appends String value for the current column to the chunk.
+     *
+     * @param val Column value.
+     */
+    public void appendString(String val, CharsetEncoder encoder) {
+        try {
+            int written = buf.putString(curOff, val, encoder);
+
+            writeOffset(curVartblItem, curOff - dataOff);
+
+            curVartblItem++;
+            curOff += written;
+        }
+        catch (CharacterCodingException e) {
+            throw new AssemblyException("Failed to encode string", e);
+        }
+    }
+
+    /**
+     * Appends byte[] value for the current column to the chunk.
+     *
+     * @param val Column value.
+     */
+    public void appendBytes(byte[] val) {
+        buf.putBytes(curOff, val);
+
+        writeOffset(curVartblItem, curOff - dataOff);
+
+        curVartblItem++;
+        curOff += val.length;
+    }
+
+    /**
+     * Appends BitSet value for the current column to the chunk.
+     *
+     * @param bitSet Column value.
+     */
+    public void appendBitmask(BitSet bitSet, int size) {
+        byte[] arr = bitSet.toByteArray();
+
+        buf.putBytes(curOff, arr);
+
+        for (int i = 0; i < size - arr.length; i++)
+            buf.put(curOff + arr.length + i, (byte)0);
+
+        curOff += size;
+    }
+
+    /**
+     * @return Chunk size in bytes.
+     */
+    public int chunkLength() {
+        return curOff - baseOff;
+    }
+
+    /**
+     * Post-write action.
+     */
+    abstract void flush();
+
+    /**
+     * Writes the given offset to the varlen table entry with the given index.
+     *
+     * @param tblEntryIdx Varlen table entry index.
+     * @param off Offset to write.
+     */
+    protected abstract void writeOffset(int tblEntryIdx, int off);
+
+    /**
+     * Sets null flag in the null map for the given column.
+     *
+     * @param colIdx Column index.
+     */
+    protected void setNull(int colIdx) {
+        int byteInMap = colIdx / 8;
+        int bitInByte = colIdx % 8;
+
+        buf.ensureCapacity(nullMapOff + byteInMap + 1);
+
+        buf.put(nullMapOff + byteInMap, (byte)(buf.get(nullMapOff + byteInMap) | (1 << bitInByte)));
+    }
+}
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/ExpandableByteBuf.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/ExpandableByteBuf.java
similarity index 92%
rename from modules/schema/src/main/java/org/apache/ignite/internal/schema/ExpandableByteBuf.java
rename to modules/schema/src/main/java/org/apache/ignite/internal/schema/row/ExpandableByteBuf.java
index d74d719..96401b3 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/ExpandableByteBuf.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/ExpandableByteBuf.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.schema;
+package org.apache.ignite.internal.schema.row;
 
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
@@ -24,6 +24,7 @@ import java.nio.charset.CharacterCodingException;
 import java.nio.charset.CharsetEncoder;
 import java.nio.charset.CoderResult;
 import java.util.Arrays;
+import org.apache.ignite.internal.util.Constants;
 
 /**
  * A simple byte array wrapper to allow dynamic byte array expansion during the row construction. Grows exponentially
@@ -46,16 +47,13 @@ import java.util.Arrays;
  */
 @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
 public class ExpandableByteBuf {
-    /** */
-    private static final int MB = 1024 * 1024;
-
-    /** */
+    /** Buffer array. */
     private byte[] arr;
 
-    /** */
+    /** Wrapped array. */
     private ByteBuffer buf;
 
-    /** */
+    /** Written bytes. */
     private int len;
 
     /**
@@ -189,7 +187,7 @@ public class ExpandableByteBuf {
                     break;
 
                 if (cr.isOverflow()) {
-                    expand(len + 1);
+                    expand(len + (int)encoder.maxBytesPerChar());
 
                     continue;
                 }
@@ -205,7 +203,7 @@ public class ExpandableByteBuf {
                 len = buf.position();
 
                 if (cr.isOverflow()) {
-                    expand(len + 1);
+                    expand(len + (int)encoder.maxBytesPerChar());
 
                     continue;
                 }
@@ -264,13 +262,13 @@ public class ExpandableByteBuf {
         int l = arr.length;
 
         while (l < cap) {
-            if (l < MB)
+            if (l < Constants.MiB)
                 l *= 2;
             else
-                l += MB;
+                l += Constants.MiB;
         }
 
-        byte[] tmp = new byte[cap];
+        byte[] tmp = new byte[l];
 
         System.arraycopy(arr, 0, tmp, 0, arr.length);
 
@@ -280,4 +278,13 @@ public class ExpandableByteBuf {
         buf.position(oldPos);
         buf.order(ByteOrder.LITTLE_ENDIAN);
     }
+
+    /**
+     * Unwrap to ByteBuffer.
+     *
+     * @return Byte buffer.
+     */
+    ByteBuffer unwrap() {
+        return buf;
+    }
 }
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/LongChunkWriter.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/LongChunkWriter.java
new file mode 100644
index 0000000..2c75d37
--- /dev/null
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/LongChunkWriter.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schema.row;
+
+/**
+ * Row chunk writer for long key/value chunks.
+ *
+ * Uses {@code int} values for coding sizes/offsets,
+ * supports chunks with payload up to 2 GiB.
+ */
+class LongChunkWriter extends AbstractChunkWriter {
+    /**
+     * Calculates vartable length (in bytes).
+     *
+     * @param items Vartable items.
+     * @return Vartable size in bytes.
+     */
+    static int vartableLength(int items) {
+        return items == 0 ? 0 : Integer.BYTES /* Table size */ + items * Integer.BYTES;
+    }
+
+    /**
+     * Creates chunk writer for long chunk format.
+     *
+     * @param buf Row buffer.
+     * @param baseOff Chunk base offset.
+     * @param nullMapLen Null-map size in bytes.
+     * @param vartblSize Amount of vartable items.
+     */
+    LongChunkWriter(ExpandableByteBuf buf, int baseOff, int nullMapLen, int vartblSize) {
+        super(
+            buf,
+            baseOff,
+            baseOff + Integer.BYTES /* Chunk size */,
+            baseOff + Integer.BYTES /* Chunk size */ + nullMapLen,
+            baseOff + Integer.BYTES /* Chunk size */ + nullMapLen + vartableLength(vartblSize));
+
+        curVartblItem = 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override void flush() {
+        final int size = chunkLength();
+
+        assert size > 0 : "Size field value overflow: " + size;
+        assert varTblOff + vartableLength(curVartblItem) == dataOff : "Vartable underflowed.";
+
+        buf.putInt(baseOff, size);
+
+        if (curVartblItem > 0)
+            buf.putInt(varTblOff, curVartblItem);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void writeOffset(int tblEntryIdx, int off) {
+        final int itemOff = varTblOff + Integer.BYTES + tblEntryIdx * Integer.BYTES;
+
+        assert off >= 0 : "Varlen offset overflow: offset=" + off;
+        assert itemOff < dataOff : "Vartable overflow: size=" + itemOff;
+
+        buf.putInt(itemOff, off);
+    }
+}
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/Row.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/Row.java
similarity index 58%
rename from modules/schema/src/main/java/org/apache/ignite/internal/schema/Row.java
rename to modules/schema/src/main/java/org/apache/ignite/internal/schema/row/Row.java
index 4ed78b0..15cb69e 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/Row.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/Row.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.schema;
+package org.apache.ignite.internal.schema.row;
 
 import java.io.IOException;
 import java.io.OutputStream;
@@ -23,6 +23,13 @@ import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.util.BitSet;
 import java.util.UUID;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.Columns;
+import org.apache.ignite.internal.schema.InvalidTypeException;
+import org.apache.ignite.internal.schema.NativeTypeSpec;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.jetbrains.annotations.NotNull;
 
 /**
  * Schema-aware row.
@@ -38,6 +45,12 @@ public class Row implements BinaryRow {
     /** Binary row. */
     private final BinaryRow row;
 
+    /** Key reader. */
+    private final AbstractChunkReader keyReader;
+
+    /** Value reader. */
+    private final AbstractChunkReader valReader;
+
     /**
      * Constructor.
      *
@@ -49,14 +62,36 @@ public class Row implements BinaryRow {
 
         this.row = row;
         this.schema = schema;
+
+        final short flags = readShort(FLAGS_FIELD_OFFSET);
+
+        keyReader = createReader(KEY_CHUNK_OFFSET,
+            (flags & RowFlags.KEY_TYNY_FORMAT) != 0,
+            (flags & RowFlags.OMIT_KEY_NULL_MAP_FLAG) == 0 ? schema.keyColumns().nullMapSize() : 0,
+            (flags & RowFlags.OMIT_KEY_VARTBL_FLAG) == 0);
+
+        valReader = ((flags & RowFlags.NO_VALUE_FLAG) == 0) ?
+            createReader(
+                KEY_CHUNK_OFFSET + keyReader.chunkLength(),
+                (flags & RowFlags.VAL_TYNY_FORMAT) != 0,
+                (flags & RowFlags.OMIT_VAL_NULL_MAP_FLAG) == 0 ? schema.valueColumns().nullMapSize() : 0,
+                (flags & RowFlags.OMIT_VAL_VARTBL_FLAG) == 0) :
+            null;
     }
 
     /**
-     * @param itemIdx Varlen table item index.
-     * @return Varlen item offset.
+     * Chunk reader factory method.
+     *
+     * @param baseOff Chunk base offset.
+     * @param isSmallChunk Small chunk format flag.
+     * @param nullMapLen Null-map length.
+     * @param hasVarTable Vartable presense flag.
+     * @return Chunk reader.
      */
-    private int varlenItemOffset(int itemIdx, int itemSize) {
-        return itemSize + itemIdx * itemSize;
+    @NotNull private AbstractChunkReader createReader(int baseOff, boolean isSmallChunk, int nullMapLen, boolean hasVarTable) {
+        return isSmallChunk ?
+            new TinyChunkReader(baseOff, nullMapLen, hasVarTable) :
+            new LargeChunkReader(baseOff, nullMapLen, hasVarTable);
     }
 
     /**
@@ -343,43 +378,29 @@ public class Row implements BinaryRow {
      */
     protected long findColumn(int colIdx, NativeTypeSpec type) throws InvalidTypeException {
         // Get base offset (key start or value start) for the given column.
-        boolean keyCol = schema.isKeyColumn(colIdx);
+        boolean isKeyCol = schema.isKeyColumn(colIdx);
 
-        final short flags = readShort(FLAGS_FIELD_OFFSET);
-
-        int size = keyCol ?
-            ((flags & RowFlags.KEY_TYNY_FORMAT) != 0) ? 1 : (flags & RowFlags.KEY_LARGE_ROW_FORMAT) != 0 ? 4 : 2 :
-            (flags & RowFlags.VAL_TYNY_FORMAT) != 0 ? 1 : (flags & RowFlags.VAL_LARGE_FORMAT) != 0 ? 4 : 2;
-
-        int off = KEY_CHUNK_OFFSET;
-
-        if (!keyCol) {
-            assert (flags & RowFlags.NO_VALUE_FLAG) == 0;
-
-            // Jump to the next chunk, the size of the first chunk is written at the chunk start.
-            off += readAdaptiveSizedItem(off, size);
-
-            // Adjust the column index according to the number of key columns.
+        // Adjust the column index according to the number of key columns.
+        if (!isKeyCol)
             colIdx -= schema.keyColumns().length();
-        }
 
-        Columns cols = keyCol ? schema.keyColumns() : schema.valueColumns();
+        AbstractChunkReader reader = isKeyCol ? keyReader : valReader;
+        Columns cols = isKeyCol ? schema.keyColumns() : schema.valueColumns();
 
         if (cols.column(colIdx).type().spec() != type)
             throw new InvalidTypeException("Invalid column type requested [requested=" + type +
                 ", column=" + cols.column(colIdx) + ']');
 
-        boolean hasVarTable = ((keyCol ? RowFlags.OMIT_KEY_VARTBL_FLAG : RowFlags.OMIT_VAL_VARTBL_FLAG) & flags) == 0;
-        boolean hasNullMap = ((keyCol ? RowFlags.OMIT_KEY_NULL_MAP_FLAG : RowFlags.OMIT_VAL_NULL_MAP_FLAG) & flags) == 0;
+        assert reader != null;
 
-        if (hasNullMap && isNull(off + size, colIdx))
+        if (reader.isNull(colIdx))
             return -1;
 
-        assert hasVarTable || type.fixedLength();
+        assert reader.hasVartable() || type.fixedLength();
 
         return type.fixedLength() ?
-            fixlenColumnOffset(cols, off, colIdx, hasVarTable, hasNullMap, size) :
-            varlenColumnOffsetAndLength(cols, off, colIdx, hasNullMap, size);
+            reader.fixlenColumnOffset(cols, colIdx) :
+            reader.varlenColumnOffsetAndLength(cols, colIdx);
     }
 
     /**
@@ -393,22 +414,6 @@ public class Row implements BinaryRow {
     }
 
     /**
-     * Checks the row's null map for the given column index in the chunk.
-     *
-     * @param nullMapOff Null map offset
-     * @param idx Offset of the column in the chunk.
-     * @return {@code true} if the column value is {@code null}.
-     */
-    private boolean isNull(int nullMapOff, int idx) {
-        int nullByte = idx / 8;
-        int posInByte = idx % 8;
-
-        int map = readByte(nullMapOff + nullByte);
-
-        return (map & (1 << posInByte)) != 0;
-    }
-
-    /**
      * Utility method to extract the column offset from the {@link #findColumn(int, NativeTypeSpec)} result. The
      * offset is calculated from the beginning of the row.
      *
@@ -430,131 +435,6 @@ public class Row implements BinaryRow {
         return (int)(offLen >>> 32);
     }
 
-    /**
-     * Calculates the offset and length of varlen column. First, it calculates the number of non-null columns
-     * preceding the requested column by folding the null map bits. This number is used to adjust the column index
-     * and find the corresponding entry in the varlen table. The length of the column is calculated either by
-     * subtracting two adjacent varlen table offsets, or by subtracting the last varlen table offset from the chunk
-     * length.
-     *
-     * @param cols Columns chunk.
-     * @param baseOff Chunk base offset.
-     * @param idx Column index in the chunk.
-     * @param hasNullMap Has null map flag.
-     * @param adaptiveItemSize
-     * @return Encoded offset (from the row start) and length of the column with the given index.
-     */
-    private long varlenColumnOffsetAndLength(Columns cols, int baseOff, int idx, boolean hasNullMap,
-        int adaptiveItemSize) {
-        int vartableOff = baseOff + adaptiveItemSize /* Chunk len field size. */;
-
-        int numNullsBefore = 0;
-
-        if (hasNullMap) {
-            vartableOff += cols.nullMapSize();
-
-            int nullMapOff = baseOff + adaptiveItemSize;
-
-            int nullStartByte = cols.firstVarlengthColumn() / 8;
-            int startBitInByte = cols.firstVarlengthColumn() % 8;
-
-            int nullEndByte = idx / 8;
-            int endBitInByte = idx % 8;
-
-            for (int i = nullStartByte; i <= nullEndByte; i++) {
-                byte nullmapByte = readByte(nullMapOff + i);
-
-                if (i == nullStartByte)
-                    // We need to clear startBitInByte least significant bits
-                    nullmapByte &= (0xFF << startBitInByte);
-
-                if (i == nullEndByte)
-                    // We need to clear 8-endBitInByte most significant bits
-                    nullmapByte &= (0xFF >> (8 - endBitInByte));
-
-                numNullsBefore += Columns.numberOfNullColumns(nullmapByte);
-            }
-        }
-
-        idx -= cols.numberOfFixsizeColumns() + numNullsBefore;
-        int vartableSize = readAdaptiveSizedItem(vartableOff, adaptiveItemSize);
-
-        // Offset of idx-th column is from base offset.
-        int resOff = readAdaptiveSizedItem(vartableOff + varlenItemOffset(idx, adaptiveItemSize), adaptiveItemSize);
-
-        long len = (idx == vartableSize - 1) ?
-            // totalLength - columnStartOffset
-            readAdaptiveSizedItem(baseOff, adaptiveItemSize) - resOff :
-            // nextColumnStartOffset - columnStartOffset
-            readAdaptiveSizedItem(vartableOff + varlenItemOffset(idx + 1, adaptiveItemSize), adaptiveItemSize) - resOff;
-
-        return (len << 32) | (resOff + baseOff);
-    }
-
-    /**
-     * Calculates the offset of the fixlen column with the given index in the row. It essentially folds the null map
-     * with the column lengths to calculate the size of non-null columns preceding the requested column.
-     *
-     * @param cols Columns chunk.
-     * @param baseOff Chunk base offset.
-     * @param idx Column index in the chunk.
-     * @param hasVarTbl Has varlen table flag.
-     * @param hasNullMap Has null map flag.
-     * @param adaptiveItemSize
-     * @return Encoded offset (from the row start) of the requested fixlen column.
-     */
-    int fixlenColumnOffset(Columns cols, int baseOff, int idx, boolean hasVarTbl, boolean hasNullMap,
-        int adaptiveItemSize) {
-        int colOff = 0;
-
-        int payloadOff = baseOff + adaptiveItemSize /* Chunk len field size. */;
-
-        // Calculate fixlen column offset.
-        {
-            int colByteIdx = idx / 8;
-
-            // Set bits starting from posInByte, inclusive, up to either the end of the byte or the last column index, inclusive
-            int startBit = idx % 8;
-            int endBit = colByteIdx == (cols.length() + 7) / 8 - 1 ? ((cols.numberOfFixsizeColumns() - 1) % 8) : 7;
-            int mask = (0xFF >> (7 - endBit)) & (0xFF << startBit);
-
-            if (hasNullMap) {
-                payloadOff += cols.nullMapSize();
-
-                // Fold offset based on the whole map bytes in the schema
-                for (int i = 0; i < colByteIdx; i++)
-                    colOff += cols.foldFixedLength(i, readByte(baseOff + adaptiveItemSize + i));
-
-                colOff += cols.foldFixedLength(colByteIdx, readByte(baseOff + adaptiveItemSize + colByteIdx) | mask);
-            }
-            else {
-                for (int i = 0; i < colByteIdx; i++)
-                    colOff += cols.foldFixedLength(i, 0);
-
-                colOff += cols.foldFixedLength(colByteIdx, mask);
-            }
-        }
-
-        if (hasVarTbl) {
-            int verlenItems = readAdaptiveSizedItem(payloadOff, adaptiveItemSize);
-
-            payloadOff += varlenItemOffset(verlenItems, adaptiveItemSize);
-        }
-
-        return payloadOff + colOff;
-    }
-
-    public int readAdaptiveSizedItem(int off, int size) {
-        switch (size) {
-            case Byte.BYTES:
-                return row.readByte(off);
-            case Short.BYTES:
-                return row.readShort(off);
-            default:
-                return row.readInteger(off);
-        }
-    }
-
     /** {@inheritDoc} */
     @Override public int schemaVersion() {
         return row.schemaVersion();
@@ -619,4 +499,235 @@ public class Row implements BinaryRow {
     @Override public byte[] readBytes(int off, int len) {
         return row.readBytes(off, len);
     }
+
+    /**
+     * Abstract chunk reader.
+     */
+    abstract class AbstractChunkReader {
+        /** Base offset. */
+        protected final int baseOff;
+
+        /** Null-map offset. */
+        protected int nullMapOff;
+
+        /** Vartable offset. */
+        protected int varTableOff;
+
+        /** Payload offset. */
+        protected int dataOff;
+
+        /**
+         * @param baseOff Chunk base offset.
+         */
+        AbstractChunkReader(int baseOff) {
+            this.baseOff = baseOff;
+        }
+
+        /**
+         * @return Chunk length in bytes
+         */
+        abstract int chunkLength();
+
+        /**
+         * @return Number of items in vartable.
+         */
+        abstract int vartableItems();
+
+        /**
+         * Checks the row's null map for the given column index in the chunk.
+         *
+         * @param idx Offset of the column in the chunk.
+         * @return {@code true} if the column value is {@code null}.
+         */
+        /** {@inheritDoc} */
+        protected boolean isNull(int idx) {
+            if (!hasNullmap())
+                return false;
+
+            int nullByte = idx / 8;
+            int posInByte = idx % 8;
+
+            int map = readByte(nullMapOff + nullByte);
+
+            return (map & (1 << posInByte)) != 0;
+        }
+
+        /**
+         * @return {@code True} if chunk has vartable.
+         */
+        protected boolean hasVartable() {
+            return dataOff > varTableOff;
+        }
+
+        /**
+         * @return {@code True} if chunk has nullmap.
+         */
+        protected boolean hasNullmap() {
+            return varTableOff > nullMapOff;
+        }
+
+        /**
+         * @param itemIdx Varlen table item index.
+         * @return Varlen item offset.
+         */
+        protected abstract int varlenItemOffset(int itemIdx);
+
+        /**
+         * Calculates the offset of the fixlen column with the given index in the row. It essentially folds the null map
+         * with the column lengths to calculate the size of non-null columns preceding the requested column.
+         *
+         * @param cols Columns chunk.
+         * @param idx Column index in the chunk.
+         * @return Encoded offset (from the row start) of the requested fixlen column.
+         */
+        int fixlenColumnOffset(Columns cols, int idx) {
+            int colOff = 0;
+
+            // Calculate fixlen column offset.
+            {
+                int colByteIdx = idx / 8;
+
+                // Set bits starting from posInByte, inclusive, up to either the end of the byte or the last column index, inclusive
+                int startBit = idx % 8;
+                int endBit = colByteIdx == (cols.length() + 7) / 8 - 1 ? ((cols.numberOfFixsizeColumns() - 1) % 8) : 7;
+                int mask = (0xFF >> (7 - endBit)) & (0xFF << startBit);
+
+                if (hasNullmap()) {
+                    // Fold offset based on the whole map bytes in the schema
+                    for (int i = 0; i < colByteIdx; i++)
+                        colOff += cols.foldFixedLength(i, readByte(nullMapOff + i));
+
+                    colOff += cols.foldFixedLength(colByteIdx, readByte(nullMapOff + colByteIdx) | mask);
+                }
+                else {
+                    for (int i = 0; i < colByteIdx; i++)
+                        colOff += cols.foldFixedLength(i, 0);
+
+                    colOff += cols.foldFixedLength(colByteIdx, mask);
+                }
+            }
+
+            return dataOff + colOff;
+        }
+
+        /**
+         * Calculates the offset and length of varlen column. First, it calculates the number of non-null columns
+         * preceding the requested column by folding the null map bits. This number is used to adjust the column index
+         * and find the corresponding entry in the varlen table. The length of the column is calculated either by
+         * subtracting two adjacent varlen table offsets, or by subtracting the last varlen table offset from the chunk
+         * length.
+         *
+         * @param cols Columns chunk.
+         * @param idx Column index in the chunk.
+         * @return Encoded offset (from the row start) and length of the column with the given index.
+         */
+        long varlenColumnOffsetAndLength(Columns cols, int idx) {
+            assert hasVartable() : "Chunk has no vartable: colId=" + idx;
+
+            if (hasNullmap()) {
+                int nullStartByte = cols.firstVarlengthColumn() / 8;
+                int startBitInByte = cols.firstVarlengthColumn() % 8;
+
+                int nullEndByte = idx / 8;
+                int endBitInByte = idx % 8;
+
+                int numNullsBefore = 0;
+
+                for (int i = nullStartByte; i <= nullEndByte; i++) {
+                    byte nullmapByte = readByte(nullMapOff + i);
+
+                    if (i == nullStartByte)
+                        // We need to clear startBitInByte least significant bits
+                        nullmapByte &= (0xFF << startBitInByte);
+
+                    if (i == nullEndByte)
+                        // We need to clear 8-endBitInByte most significant bits
+                        nullmapByte &= (0xFF >> (8 - endBitInByte));
+
+                    numNullsBefore += Columns.numberOfNullColumns(nullmapByte);
+                }
+
+                idx -= numNullsBefore;
+            }
+
+            idx -= cols.numberOfFixsizeColumns();
+
+            // Offset of idx-th column is from base offset.
+            int resOff = varlenItemOffset(idx);
+
+            long len = (idx == vartableItems() - 1) ?
+                // totalLength - columnStartOffset
+                (baseOff + chunkLength()) - resOff :
+                // nextColumnStartOffset - columnStartOffset
+                varlenItemOffset(idx + 1) - resOff;
+
+            return (len << 32) | resOff;
+        }
+    }
+
+    /**
+     * Tiny chunk format reader.
+     */
+    class TinyChunkReader extends AbstractChunkReader {
+        /**
+         * @param baseOff Base offset.
+         * @param nullMapLen Null-map length in bytes.
+         * @param hasVarTable Vartable presence flag.
+         */
+        TinyChunkReader(int baseOff, int nullMapLen, boolean hasVarTable) {
+            super(baseOff);
+
+            nullMapOff = baseOff + Byte.BYTES;
+            varTableOff = nullMapOff + nullMapLen;
+            dataOff = varTableOff + (hasVarTable ? Byte.BYTES + (readByte(varTableOff) & 0xFF) * Byte.BYTES : 0);
+        }
+
+        /** {@inheritDoc} */
+        @Override int chunkLength() {
+            return readByte(baseOff) & 0xFF;
+        }
+
+        /** {@inheritDoc} */
+        @Override int vartableItems() {
+            return hasVartable() ? (readByte(varTableOff) & 0xFF) : 0;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected int varlenItemOffset(int itemIdx) {
+            return dataOff + (readByte(varTableOff + Byte.BYTES + itemIdx * Byte.BYTES) & 0xFF);
+        }
+    }
+
+    /**
+     * Large chunk format reader.
+     */
+    class LargeChunkReader extends AbstractChunkReader {
+        /**
+         * @param baseOff Base offset.
+         * @param nullMapLen Null-map length in bytes.
+         * @param hasVarTable Vartable presence flag.
+         */
+        LargeChunkReader(int baseOff, int nullMapLen, boolean hasVarTable) {
+            super(baseOff);
+
+            nullMapOff = baseOff + Integer.BYTES;
+            varTableOff = baseOff + Integer.BYTES + nullMapLen;
+            dataOff = varTableOff + (hasVarTable ? Integer.BYTES + readInteger(varTableOff) * Integer.BYTES : 0);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int chunkLength() {
+            return readInteger(baseOff);
+        }
+
+        /** {@inheritDoc} */
+        @Override int vartableItems() {
+            return hasVartable() ? readInteger(varTableOff) : 0;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected int varlenItemOffset(int itemIdx) {
+            return dataOff + readInteger(varTableOff + Integer.BYTES + itemIdx * Integer.BYTES);
+        }
+    }
 }
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/RowAssembler.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/RowAssembler.java
new file mode 100644
index 0000000..b4e94c2
--- /dev/null
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/RowAssembler.java
@@ -0,0 +1,485 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schema.row;
+
+import java.nio.charset.CharsetEncoder;
+import java.nio.charset.StandardCharsets;
+import java.util.BitSet;
+import java.util.UUID;
+import org.apache.ignite.internal.schema.AssemblyException;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.BinaryRow.RowFlags;
+import org.apache.ignite.internal.schema.BitmaskNativeType;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.Columns;
+import org.apache.ignite.internal.schema.NativeType;
+import org.apache.ignite.internal.schema.NativeTypeSpec;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+
+import static org.apache.ignite.internal.schema.BinaryRow.KEY_CHUNK_OFFSET;
+import static org.apache.ignite.internal.schema.BinaryRow.KEY_HASH_FIELD_OFFSET;
+import static org.apache.ignite.internal.schema.BinaryRow.RowFlags.OMIT_KEY_VARTBL_FLAG;
+import static org.apache.ignite.internal.schema.BinaryRow.RowFlags.OMIT_VAL_VARTBL_FLAG;
+
+/**
+ * Utility class to build rows using column appending pattern. The external user of this class must consult
+ * with the schema and provide the columns in strict internal column sort order during the row construction.
+ * Additionally, the user of this class should pre-calculate the resulting row size when possible to avoid
+ * unnecessary data copies and allow some size-optimizations can be applied.
+ *
+ * @see #utf8EncodedLength(CharSequence)
+ */
+public class RowAssembler {
+    /** Schema. */
+    private final SchemaDescriptor schema;
+
+    /** The number of non-null varlen columns in values chunk. */
+    private final int nonNullVarlenValCols;
+
+    /** Target byte buffer to write to. */
+    private final ExpandableByteBuf buf;
+
+    /** Val write mode flag. */
+    private final boolean isSmallVal;
+
+    /** Current columns chunk. */
+    private Columns curCols;
+
+    /** Current field index (the field is unset). */
+    private int curCol;
+
+    /** Flags. */
+    private short flags;
+
+    /** Charset encoder for strings. Initialized lazily. */
+    private CharsetEncoder strEncoder;
+
+    /** Current chunk writer. */
+    private AbstractChunkWriter chunkWriter;
+
+    /**
+     * Calculates encoded string length.
+     *
+     * @param seq Char sequence.
+     * @return Encoded string length.
+     * @implNote This implementation is not tolerant to malformed char sequences.
+     */
+    public static int utf8EncodedLength(CharSequence seq) {
+        int cnt = 0;
+
+        for (int i = 0, len = seq.length(); i < len; i++) {
+            char ch = seq.charAt(i);
+
+            if (ch <= 0x7F)
+                cnt++;
+            else if (ch <= 0x7FF)
+                cnt += 2;
+            else if (Character.isHighSurrogate(ch)) {
+                cnt += 4;
+                ++i;
+            }
+            else
+                cnt += 3;
+        }
+
+        return cnt;
+    }
+
+    /**
+     * Creates RowAssembler for chunks of unknown size.
+     * <p>
+     * RowAssembler will use adaptive buffer size and omit some optimizations for small key/value chunks.
+     *
+     * @param schema Row schema.
+     * @param nonNullVarlenKeyCols Number of non-null varlen columns in key chunk.
+     * @param nonNullVarlenValCols Number of non-null varlen columns in value chunk.
+     */
+    public RowAssembler(
+        SchemaDescriptor schema,
+        int nonNullVarlenKeyCols,
+        int nonNullVarlenValCols
+    ) {
+        this(schema,
+            0,
+            schema.keyColumns().nullMapSize() > 0,
+            nonNullVarlenKeyCols,
+            0,
+            schema.valueColumns().nullMapSize() > 0,
+            nonNullVarlenValCols);
+    }
+
+    /**
+     * Creates RowAssembler for chunks with estimated sizes.
+     * <p>
+     * RowAssembler will apply optimizations based on chunks sizes estimations.
+     *
+     * @param schema Row schema.
+     * @param keyDataSize Key payload size. Estimated upper-bound or zero if unknown.
+     * @param nonNullVarlenKeyCols Number of non-null varlen columns in key chunk.
+     * @param valDataSize Value data size. Estimated upper-bound or zero if unknown.
+     * @param nonNullVarlenValCols Number of non-null varlen columns in value chunk.
+     */
+    public RowAssembler(
+        SchemaDescriptor schema,
+        int keyDataSize,
+        int nonNullVarlenKeyCols,
+        int valDataSize,
+        int nonNullVarlenValCols
+    ) {
+        this(
+            schema,
+            keyDataSize,
+            schema.keyColumns().nullMapSize() > 0,
+            nonNullVarlenKeyCols,
+            valDataSize,
+            schema.valueColumns().nullMapSize() > 0,
+            nonNullVarlenValCols);
+    }
+
+    /**
+     * Creates RowAssembler for chunks with estimated sizes.
+     * <p>
+     * RowAssembler will apply optimizations based on chunks sizes estimations.
+     *
+     * @param schema Row schema.
+     * @param keyDataSize Key payload size. Estimated upper-bound or zero if unknown.
+     * @param keyHasNulls Null flag. {@code True} if key has nulls values, {@code false} otherwise.
+     * @param nonNullVarlenKeyCols Number of non-null varlen columns in key chunk.
+     * @param valDataSize Value data size. Estimated upper-bound or zero if unknown.
+     * @param valHasNulls Null flag. {@code True} if value has nulls values, {@code false} otherwise.
+     * @param nonNullVarlenValCols Number of non-null varlen columns in value chunk.
+     */
+    public RowAssembler(
+        SchemaDescriptor schema,
+        int keyDataSize,
+        boolean keyHasNulls,
+        int nonNullVarlenKeyCols,
+        int valDataSize,
+        boolean valHasNulls,
+        int nonNullVarlenValCols
+    ) {
+        this.schema = schema;
+        this.nonNullVarlenValCols = nonNullVarlenValCols;
+
+        curCols = schema.keyColumns();
+        curCol = 0;
+        flags = 0;
+        strEncoder = null;
+
+        boolean isSmallKey = isTinyChunk(keyDataSize);
+        isSmallVal = isTinyChunk(valDataSize);
+
+        // Key flags.
+        if (schema.keyColumns().nullMapSize() == 0)
+            flags |= RowFlags.OMIT_KEY_NULL_MAP_FLAG;
+        if (nonNullVarlenKeyCols == 0)
+            flags |= OMIT_KEY_VARTBL_FLAG;
+        if (isSmallKey)
+            flags |= RowFlags.KEY_TYNY_FORMAT;
+
+        final int keyNullMapSize = keyHasNulls ? schema.keyColumns().nullMapSize() : 0;
+
+        int size = BinaryRow.HEADER_SIZE + keyDataSize + valDataSize +
+            keyNullMapSize +
+            (valHasNulls ? schema.valueColumns().nullMapSize() : 0) +
+            (isSmallKey ? TinyChunkWriter.vartableLength(nonNullVarlenKeyCols) :
+                LongChunkWriter.vartableLength(nonNullVarlenKeyCols)) +
+            (isSmallVal ? TinyChunkWriter.vartableLength(nonNullVarlenValCols) :
+                LongChunkWriter.vartableLength(nonNullVarlenValCols));
+
+        buf = new ExpandableByteBuf(size);
+        buf.putShort(0, (short)schema.version());
+
+        chunkWriter = createChunkWriter(KEY_CHUNK_OFFSET, keyNullMapSize, nonNullVarlenKeyCols, isSmallKey);
+    }
+
+    private boolean isTinyChunk(int dataSize) {
+        return dataSize > 0 && dataSize < 256;
+    }
+
+    /**
+     * Chunk writer factory method.
+     *
+     * @param baseOff Chunk base offset.
+     * @param nullMapLen Null-map size in bytes.
+     * @param vartblSize Amount of vartable items.
+     * @param tiny Tiny format flag.
+     * @return Chunk writer.
+     */
+    private AbstractChunkWriter createChunkWriter(int baseOff, int nullMapLen, int vartblSize, boolean tiny) {
+        return tiny ?
+            new TinyChunkWriter(
+                buf,
+                baseOff,
+                nullMapLen,
+                vartblSize) :
+            new LongChunkWriter(
+                buf,
+                baseOff,
+                nullMapLen,
+                vartblSize);
+    }
+
+    /**
+     * Appends {@code null} value for the current column to the chunk.
+     */
+    public void appendNull() {
+        Column col = curCols.column(curCol);
+
+        if (!col.nullable())
+            throw new IllegalArgumentException("Failed to set column (null was passed, but column is not nullable): " +
+                col);
+
+        chunkWriter.setNull(curCol);
+
+        shiftColumn();
+    }
+
+    /**
+     * Appends byte value for the current column to the chunk.
+     *
+     * @param val Column value.
+     */
+    public void appendByte(byte val) {
+        checkType(NativeTypes.BYTE);
+
+        chunkWriter.appendByte(val);
+
+        shiftColumn();
+    }
+
+    /**
+     * Appends short value for the current column to the chunk.
+     *
+     * @param val Column value.
+     */
+    public void appendShort(short val) {
+        checkType(NativeTypes.SHORT);
+
+        chunkWriter.appendShort(val);
+
+        shiftColumn();
+    }
+
+    /**
+     * Appends int value for the current column to the chunk.
+     *
+     * @param val Column value.
+     */
+    public void appendInt(int val) {
+        checkType(NativeTypes.INTEGER);
+
+        chunkWriter.appendInt(val);
+
+        shiftColumn();
+    }
+
+    /**
+     * Appends long value for the current column to the chunk.
+     *
+     * @param val Column value.
+     */
+    public void appendLong(long val) {
+        checkType(NativeTypes.LONG);
+
+        chunkWriter.appendLong(val);
+
+        shiftColumn();
+    }
+
+    /**
+     * Appends float value for the current column to the chunk.
+     *
+     * @param val Column value.
+     */
+    public void appendFloat(float val) {
+        checkType(NativeTypes.FLOAT);
+
+        chunkWriter.appendFloat(val);
+
+        shiftColumn();
+    }
+
+    /**
+     * Appends double value for the current column to the chunk.
+     *
+     * @param val Column value.
+     */
+    public void appendDouble(double val) {
+        checkType(NativeTypes.DOUBLE);
+
+        chunkWriter.appendDouble(val);
+
+        shiftColumn();
+    }
+
+    /**
+     * Appends UUID value for the current column to the chunk.
+     *
+     * @param uuid Column value.
+     */
+    public void appendUuid(UUID uuid) {
+        checkType(NativeTypes.UUID);
+
+        chunkWriter.appendUuid(uuid);
+
+        shiftColumn();
+    }
+
+    /**
+     * Appends String value for the current column to the chunk.
+     *
+     * @param val Column value.
+     */
+    public void appendString(String val) {
+        checkType(NativeTypes.STRING);
+
+        assert (flags & (schema.keyColumns() == curCols ? OMIT_KEY_VARTBL_FLAG : OMIT_VAL_VARTBL_FLAG)) == 0 :
+            "Illegal writing of varlen when 'omit vartable' flag is set for a chunk.";
+
+        chunkWriter.appendString(val, encoder());
+
+        shiftColumn();
+    }
+
+    /**
+     * Appends byte[] value for the current column to the chunk.
+     *
+     * @param val Column value.
+     */
+    public void appendBytes(byte[] val) {
+        checkType(NativeTypes.BYTES);
+
+        assert (flags & (schema.keyColumns() == curCols ? OMIT_KEY_VARTBL_FLAG : OMIT_VAL_VARTBL_FLAG)) == 0 :
+            "Illegal writing of varlen when 'omit vartable' flag is set for a chunk.";
+
+        chunkWriter.appendBytes(val);
+
+        shiftColumn();
+    }
+
+    /**
+     * Appends BitSet value for the current column to the chunk.
+     *
+     * @param bitSet Column value.
+     */
+    public void appendBitmask(BitSet bitSet) {
+        Column col = curCols.column(curCol);
+
+        checkType(NativeTypeSpec.BITMASK);
+
+        BitmaskNativeType maskType = (BitmaskNativeType)col.type();
+
+        if (bitSet.length() > maskType.bits())
+            throw new IllegalArgumentException("Failed to set bitmask for column '" + col.name() + "' " +
+                "(mask size exceeds allocated size) [mask=" + bitSet + ", maxSize=" + maskType.bits() + "]");
+
+        chunkWriter.appendBitmask(bitSet, maskType.sizeInBytes());
+
+        shiftColumn();
+    }
+
+    /**
+     * @return Serialized row.
+     */
+    public byte[] build() {
+        if (schema.keyColumns() == curCols)
+            throw new AssemblyException("Key column missed: colIdx=" + curCol);
+        else {
+            if (curCol == 0)
+                flags |= RowFlags.NO_VALUE_FLAG;
+            else if (schema.valueColumns().length() != curCol)
+                throw new AssemblyException("Value column missed: colIdx=" + curCol);
+        }
+
+        buf.putShort(BinaryRow.FLAGS_FIELD_OFFSET, flags);
+
+        return buf.toArray();
+    }
+
+    /**
+     * @return UTF-8 string encoder.
+     */
+    private CharsetEncoder encoder() {
+        if (strEncoder == null)
+            strEncoder = StandardCharsets.UTF_8.newEncoder();
+
+        return strEncoder;
+    }
+
+    /**
+     * Checks that the type being appended matches the column type.
+     *
+     * @param type Type spec that is attempted to be appended.
+     */
+    private void checkType(NativeTypeSpec type) {
+        Column col = curCols.column(curCol);
+
+        if (col.type().spec() != type)
+            throw new IllegalArgumentException("Failed to set column (int was passed, but column is of different " +
+                "type): " + col);
+    }
+
+    /**
+     * Checks that the type being appended matches the column type.
+     *
+     * @param type Type that is attempted to be appended.
+     */
+    private void checkType(NativeType type) {
+        checkType(type.spec());
+    }
+
+    /**
+     * Shifts current column indexes as necessary, also
+     * switch to value chunk writer when moving from key to value columns.
+     */
+    private void shiftColumn() {
+        curCol++;
+
+        if (curCol == curCols.length()) {
+            // Write sizes.
+            chunkWriter.flush();
+
+            if (schema.valueColumns() == curCols)
+                return; // No more columns.
+
+            // Write key hash.
+            buf.putInt(KEY_HASH_FIELD_OFFSET, XXHash32.hash(buf.unwrap(), chunkWriter.dataOff, chunkWriter.curOff - chunkWriter.dataOff));
+
+            // Switch key->value columns.
+            curCols = schema.valueColumns();
+            curCol = 0;
+
+            // Write value flags.
+            if (nonNullVarlenValCols == 0)
+                flags |= OMIT_VAL_VARTBL_FLAG;
+            if (schema.valueColumns().nullMapSize() == 0)
+                flags |= RowFlags.OMIT_VAL_NULL_MAP_FLAG;
+            if (isSmallVal)
+                flags |= RowFlags.VAL_TYNY_FORMAT;
+
+            // Create value chunk writer.
+            chunkWriter = createChunkWriter(
+                BinaryRow.HEADER_SIZE + chunkWriter.chunkLength() /* Key chunk size */,
+                schema.valueColumns().nullMapSize(),
+                nonNullVarlenValCols,
+                isSmallVal);
+        }
+    }
+}
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/TinyChunkWriter.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/TinyChunkWriter.java
new file mode 100644
index 0000000..3bde275
--- /dev/null
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/TinyChunkWriter.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schema.row;
+
+/**
+ * Row chunk writer for small key/value chunks.
+ *
+ * Uses {@code byte} values for coding sizes/offsets,
+ * supports chunks with payload less upt to 255 bytes.
+ */
+class TinyChunkWriter extends AbstractChunkWriter {
+    /**
+     * Calculates vartable length (in bytes).
+     *
+     * @param items Vartable items.
+     * @return Vartable size in bytes.
+     */
+    static int vartableLength(int items) {
+        return items == 0 ? 0 : Byte.BYTES /* Table size */ + items * Byte.BYTES;
+    }
+
+    /**
+     * Creates chunk writer to write chunk in tiny format.
+     *
+     * @param buf Row buffer.
+     * @param baseOff Chunk base offset.
+     * @param nullMapLen Null-map size in bytes.
+     * @param vartblSize Amount of vartable items.
+     */
+    TinyChunkWriter(ExpandableByteBuf buf, int baseOff, int nullMapLen, int vartblSize) {
+        super(
+            buf,
+            baseOff,
+            baseOff + Byte.BYTES /* Chunk size */,
+            baseOff + Byte.BYTES + nullMapLen,
+            baseOff + Byte.BYTES + nullMapLen + vartableLength(vartblSize));
+
+        curVartblItem = 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override void flush() {
+        final int size = chunkLength();
+
+        assert size < (2 << 8) && size > 0 : "Size field value overflow: " + size;
+
+        buf.put(baseOff, (byte)size);
+
+        if (curVartblItem > 0)
+            buf.put(varTblOff, (byte)curVartblItem);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void writeOffset(int tblEntryIdx, int off) {
+        final int itemOff = varTblOff + Byte.BYTES + tblEntryIdx * Byte.BYTES;
+
+        assert off < (2 << 8) && off >= 0 : "Varlen offset overflow: offset=" + off;
+        assert itemOff < dataOff : "Vartable overflow: size=" + itemOff;
+
+        buf.put(itemOff, (byte)off);
+    }
+}
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/XXHash32.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/XXHash32.java
new file mode 100644
index 0000000..2b71c4a
--- /dev/null
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/XXHash32.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schema.row;
+
+import java.nio.ByteBuffer;
+import java.util.zip.Checksum;
+
+import static java.lang.Integer.rotateLeft;
+
+/**
+ * Implementation of the xxhash32 hash algorithm.
+ *
+ * Copied from Commons Compress 1.14 <a href=
+ * "https://git-wip-us.apache.org/repos/asf?p=commons-compress.git;a=blob;f=src/main/java/org/apache/commons/compress/compressors/lz4/XXHash32.java;h=a406ffc197449be594d46f0d2712b2d4786a1e68;hb=HEAD">https://git-wip-us.apache.org/repos/asf?p=commons-compress.git;a=blob;f=src/main/java/org/apache/commons/compress/compressors/lz4/XXHash32.java;h=a406ffc197449be594d46f0d2712b2d4786a1e68;hb=HEAD</a>
+ * <p>
+ * NotThreadSafe
+ *
+ * @see <a href="http://cyan4973.github.io/xxHash/">xxHash</a>
+ * @since 1.11
+ */
+public class XXHash32 implements Checksum {
+    /** Buffer size. */
+    private static final int BUF_SIZE = 16;
+
+    /** Rotation shift. */
+    private static final int ROTATE_BITS = 13;
+
+    /** Prime number. */
+    private static final int PRIME1 = (int) 2654435761L;
+    /** Prime number. */
+    private static final int PRIME2 = (int) 2246822519L;
+    /** Prime number. */
+    private static final int PRIME3 = (int) 3266489917L;
+    /** Prime number. */
+    private static final int PRIME4 =  668265263;
+    /** Prime number. */
+    private static final int PRIME5 =  374761393;
+
+    /** One-byte len buffer. */
+    private final byte[] oneByte = new byte[1];
+
+    /** State. */
+    private final int[] state = new int[4];
+
+    // Note: the code used to use ByteBuffer but the manual method is 50% faster
+    // See: http://git-wip-us.apache.org/repos/asf/commons-compress/diff/2f56fb5c
+    private final byte[] buffer = new byte[BUF_SIZE];
+
+    /** Seed. */
+    private final int seed;
+
+    /** */
+    private int totalLen;
+
+    /** Position. */
+    private int pos;
+
+    /** Set to true when the state array has been updated since the last reset. */
+    private boolean stateUpdated;
+
+    /**
+     * Calculates hash for byte buffer slice.
+     *
+     * @param buf Buffer.
+     * @param off Slice offset.
+     * @param len Slice len.
+     * @return Hash code.
+     */
+    public static int hash(ByteBuffer buf, int off, int len) {
+        final XXHash32 hasher = new XXHash32(0);
+
+        hasher.update(buf.array(), off + buf.arrayOffset(), len);
+
+        return (int)hasher.getValue();
+    }
+
+    /**
+     * Creates an XXHash32 instance with a seed of 0.
+     */
+    public XXHash32() {
+        this(0);
+    }
+
+    /**
+     * Creates an XXHash32 instance.
+     * @param seed the seed to use
+     */
+    public XXHash32(final int seed) {
+        this.seed = seed;
+        initializeState();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void reset() {
+        initializeState();
+        totalLen = 0;
+        pos = 0;
+        stateUpdated = false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void update(final int b) {
+        oneByte[0] = (byte) (b & 0xff);
+        update(oneByte, 0, 1);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void update(final byte[] b, int off, final int len) {
+        if (len <= 0) {
+            return;
+        }
+        totalLen += len;
+
+        final int end = off + len;
+
+        // Check if the unprocessed bytes and new bytes can fill a block of 16.
+        // Make this overflow safe in the event that len is Integer.MAX_VALUE.
+        // Equivalent to: (pos + len < BUF_SIZE)
+        if (pos + len - BUF_SIZE < 0) {
+            System.arraycopy(b, off, buffer, pos, len);
+            pos += len;
+            return;
+        }
+
+        // Process left-over bytes with new bytes
+        if (pos > 0) {
+            final int size = BUF_SIZE - pos;
+            System.arraycopy(b, off, buffer, pos, size);
+            process(buffer, 0);
+            off += size;
+        }
+
+        final int limit = end - BUF_SIZE;
+        while (off <= limit) {
+            process(b, off);
+            off += BUF_SIZE;
+        }
+
+        // Handle left-over bytes
+        if (off < end) {
+            pos = end - off;
+            System.arraycopy(b, off, buffer, 0, pos);
+        } else {
+            pos = 0;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getValue() {
+        int hash;
+        if (stateUpdated) {
+            // Hash with the state
+            hash =
+                rotateLeft(state[0],  1) +
+                    rotateLeft(state[1],  7) +
+                    rotateLeft(state[2], 12) +
+                    rotateLeft(state[3], 18);
+        } else {
+            // Hash using the original seed from position 2
+            hash = state[2] + PRIME5;
+        }
+        hash += totalLen;
+
+        int idx = 0;
+        final int limit = pos - 4;
+        for (; idx <= limit; idx += 4) {
+            hash = rotateLeft(hash + getInt(buffer, idx) * PRIME3, 17) * PRIME4;
+        }
+        while (idx < pos) {
+            hash = rotateLeft(hash + (buffer[idx++] & 0xff) * PRIME5, 11) * PRIME1;
+        }
+
+        hash ^= hash >>> 15;
+        hash *= PRIME2;
+        hash ^= hash >>> 13;
+        hash *= PRIME3;
+        hash ^= hash >>> 16;
+        return hash & 0xffffffffL;
+    }
+
+    /**
+     * Gets the little-endian int from 4 bytes starting at the specified index.
+     *
+     * @param buffer The data
+     * @param idx The index
+     * @return The little-endian int
+     */
+    private static int getInt(final byte[] buffer, final int idx) {
+        return ((buffer[idx    ] & 0xff)      ) |
+            ((buffer[idx + 1] & 0xff) <<  8) |
+            ((buffer[idx + 2] & 0xff) << 16) |
+            ((buffer[idx + 3] & 0xff) << 24);
+    }
+
+    /**
+     * Initialize state.
+     */
+    private void initializeState() {
+        state[0] = seed + PRIME1 + PRIME2;
+        state[1] = seed + PRIME2;
+        state[2] = seed;
+        state[3] = seed - PRIME1;
+    }
+
+    /**
+     * @param b Buffer.
+     * @param offset Offset.
+     */
+    private void process(final byte[] b, final int offset) {
+        // local shadows for performance
+        int s0 = state[0];
+        int s1 = state[1];
+        int s2 = state[2];
+        int s3 = state[3];
+
+        s0 = rotateLeft(s0 + getInt(b, offset) * PRIME2, ROTATE_BITS) * PRIME1;
+        s1 = rotateLeft(s1 + getInt(b, offset + 4) * PRIME2, ROTATE_BITS) * PRIME1;
+        s2 = rotateLeft(s2 + getInt(b, offset + 8) * PRIME2, ROTATE_BITS) * PRIME1;
+        s3 = rotateLeft(s3 + getInt(b, offset + 12) * PRIME2, ROTATE_BITS) * PRIME1;
+
+        state[0] = s0;
+        state[1] = s1;
+        state[2] = s2;
+        state[3] = s3;
+
+        stateUpdated = true;
+    }
+}
diff --git a/modules/schema/src/test/java/org/apache/ignite/internal/schema/ExpandableByteBufTest.java b/modules/schema/src/test/java/org/apache/ignite/internal/schema/ExpandableByteBufTest.java
index dc78997..b03cfe4 100644
--- a/modules/schema/src/test/java/org/apache/ignite/internal/schema/ExpandableByteBufTest.java
+++ b/modules/schema/src/test/java/org/apache/ignite/internal/schema/ExpandableByteBufTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.schema;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.nio.charset.StandardCharsets;
+import org.apache.ignite.internal.schema.row.ExpandableByteBuf;
 import org.junit.jupiter.api.Test;
 
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
diff --git a/modules/schema/src/test/java/org/apache/ignite/internal/schema/RowAssemblerTest.java b/modules/schema/src/test/java/org/apache/ignite/internal/schema/RowAssemblerTest.java
index 86edaef..82a7e3b 100644
--- a/modules/schema/src/test/java/org/apache/ignite/internal/schema/RowAssemblerTest.java
+++ b/modules/schema/src/test/java/org/apache/ignite/internal/schema/RowAssemblerTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.schema;
 
 import java.util.Arrays;
 import java.util.UUID;
+import org.apache.ignite.internal.schema.row.RowAssembler;
 import org.junit.jupiter.api.Test;
 
 import static org.apache.ignite.internal.schema.NativeTypes.BYTE;
@@ -50,7 +51,7 @@ public class RowAssemblerTest {
         SchemaDescriptor schema = new SchemaDescriptor(tableId, 42, keyCols, valCols);
 
         {
-            RowAssembler asm = new RowAssembler(schema, 0, 0, 0);
+            RowAssembler asm = new RowAssembler(schema, 128, 0, 128, 0);
 
             asm.appendInt(33);
             asm.appendInt(-71);
@@ -59,7 +60,7 @@ public class RowAssemblerTest {
         }
 
         { // Null value.
-            RowAssembler asm = new RowAssembler(schema, 0, 0, 0);
+            RowAssembler asm = new RowAssembler(schema, 128, 0, 128, 0);
 
             asm.appendInt(-33);
             asm.appendNull();
@@ -68,7 +69,7 @@ public class RowAssemblerTest {
         }
 
         { // No value.
-            RowAssembler asm = new RowAssembler(schema, 0, 0, 0);
+            RowAssembler asm = new RowAssembler(schema, 128, 0, 128, 0);
 
             asm.appendInt(-33);
 
@@ -87,7 +88,7 @@ public class RowAssemblerTest {
         SchemaDescriptor schema = new SchemaDescriptor(tableId, 42, keyCols, valCols);
 
         { // With value.
-            RowAssembler asm = new RowAssembler(schema, 0, 0, 0);
+            RowAssembler asm = new RowAssembler(schema, 128, 0, 128, 0);
 
             asm.appendShort((short)33);
             asm.appendShort((short)71L);
@@ -96,7 +97,7 @@ public class RowAssemblerTest {
         }
 
         { // No value.
-            RowAssembler asm = new RowAssembler(schema, 0, 0, 0);
+            RowAssembler asm = new RowAssembler(schema, 128, 0, 128, 0);
 
             asm.appendShort((short)-33);
 
@@ -115,7 +116,7 @@ public class RowAssemblerTest {
         SchemaDescriptor schema = new SchemaDescriptor(tableId, 42, keyCols, valCols);
 
         {
-            RowAssembler asm = new RowAssembler(schema, 0, 0, 1);
+            RowAssembler asm = new RowAssembler(schema, 128, 0, 128, 1);
 
             asm.appendShort((short)-33);
             asm.appendString("val");
@@ -124,7 +125,7 @@ public class RowAssemblerTest {
         }
 
         { // Null value.
-            RowAssembler asm = new RowAssembler(schema, 0, 0, 0);
+            RowAssembler asm = new RowAssembler(schema, 128, 0, 128, 0);
 
             asm.appendShort((short)33);
             asm.appendNull();
@@ -133,7 +134,7 @@ public class RowAssemblerTest {
         }
 
         { // No value.
-            RowAssembler asm = new RowAssembler(schema, 0, 0, 0);
+            RowAssembler asm = new RowAssembler(schema, 128, 0, 128, 0);
 
             asm.appendShort((short)33);
 
@@ -152,7 +153,7 @@ public class RowAssemblerTest {
         SchemaDescriptor schema = new SchemaDescriptor(tableId, 42, keyCols, valCols);
 
         {
-            RowAssembler asm = new RowAssembler(schema, 0, 0, 1);
+            RowAssembler asm = new RowAssembler(schema, 128, 0, 128, 1);
 
             asm.appendShort((short)-33);
             asm.appendString("val");
@@ -161,7 +162,7 @@ public class RowAssemblerTest {
         }
 
         { // No value.
-            RowAssembler asm = new RowAssembler(schema, 0, 0, 0);
+            RowAssembler asm = new RowAssembler(schema, 128, 0, 128, 0);
 
             asm.appendShort((short)33);
 
@@ -180,7 +181,7 @@ public class RowAssemblerTest {
         SchemaDescriptor schema = new SchemaDescriptor(tableId, 42, keyCols, valCols);
 
         {
-            RowAssembler asm = new RowAssembler(schema, 0, 0, 0);
+            RowAssembler asm = new RowAssembler(schema, 128, 0, 128, 0);
 
             asm.appendShort((short)-33);
             asm.appendByte((byte)71);
@@ -189,7 +190,7 @@ public class RowAssemblerTest {
         }
 
         { // Null key.
-            RowAssembler asm = new RowAssembler(schema, 0, 0, 0);
+            RowAssembler asm = new RowAssembler(schema, 128, 0, 128, 0);
 
             asm.appendNull();
             asm.appendByte((byte)-71);
@@ -198,7 +199,7 @@ public class RowAssemblerTest {
         }
 
         { // No value.
-            RowAssembler asm = new RowAssembler(schema, 0, 0, 0);
+            RowAssembler asm = new RowAssembler(schema, 128, 0, 128, 0);
 
             asm.appendShort((short)33);
 
@@ -217,7 +218,7 @@ public class RowAssemblerTest {
         SchemaDescriptor schema = new SchemaDescriptor(tableId, 42, keyCols, valCols);
 
         {
-            RowAssembler asm = new RowAssembler(schema, 0, 0, 0);
+            RowAssembler asm = new RowAssembler(schema, 128, 0, 128, 0);
 
             asm.appendShort((short)-1133);
             asm.appendShort((short)-1071);
@@ -226,7 +227,7 @@ public class RowAssemblerTest {
         }
 
         { // Null key.
-            RowAssembler asm = new RowAssembler(schema, 0, 0, 0);
+            RowAssembler asm = new RowAssembler(schema, 128, 0, 128, 0);
 
             asm.appendNull();
             asm.appendShort((short)1171);
@@ -235,7 +236,7 @@ public class RowAssemblerTest {
         }
 
         { // Null value.
-            RowAssembler asm = new RowAssembler(schema, 0, 0, 0);
+            RowAssembler asm = new RowAssembler(schema, 128, 0, 128, 0);
 
             asm.appendShort((short)1133);
             asm.appendNull();
@@ -244,7 +245,7 @@ public class RowAssemblerTest {
         }
 
         { // Null both.
-            RowAssembler asm = new RowAssembler(schema, 0, 0, 0);
+            RowAssembler asm = new RowAssembler(schema, 128, 0, 128, 0);
 
             asm.appendNull();
             asm.appendNull();
@@ -253,7 +254,7 @@ public class RowAssemblerTest {
         }
 
         { // No value.
-            RowAssembler asm = new RowAssembler(schema, 0, 0, 0);
+            RowAssembler asm = new RowAssembler(schema, 128, 0, 128, 0);
 
             asm.appendShort((short)1133);
 
@@ -272,7 +273,7 @@ public class RowAssemblerTest {
         SchemaDescriptor schema = new SchemaDescriptor(tableId, 42, keyCols, valCols);
 
         {
-            RowAssembler asm = new RowAssembler(schema, 0, 0, 1);
+            RowAssembler asm = new RowAssembler(schema, 128, 0, 128, 1);
 
             asm.appendInt(-33);
             asm.appendString("val");
@@ -281,7 +282,7 @@ public class RowAssemblerTest {
         }
 
         { // Null key.
-            RowAssembler asm = new RowAssembler(schema, 0, 0, 1);
+            RowAssembler asm = new RowAssembler(schema, 128, 0, 128, 1);
 
             asm.appendNull();
             asm.appendString("val");
@@ -290,7 +291,7 @@ public class RowAssemblerTest {
         }
 
         { // Null value.
-            RowAssembler asm = new RowAssembler(schema, 0, 0, 0);
+            RowAssembler asm = new RowAssembler(schema, 128, 0, 128, 0);
 
             asm.appendInt(33);
             asm.appendNull();
@@ -299,7 +300,7 @@ public class RowAssemblerTest {
         }
 
         { // Null both.
-            RowAssembler asm = new RowAssembler(schema, 0, 0, 0);
+            RowAssembler asm = new RowAssembler(schema, 128, 0, 128, 0);
 
             asm.appendNull();
             asm.appendNull();
@@ -308,7 +309,7 @@ public class RowAssemblerTest {
         }
 
         { // No value.
-            RowAssembler asm = new RowAssembler(schema, 0, 0, 0);
+            RowAssembler asm = new RowAssembler(schema, 128, 0, 128, 0);
 
             asm.appendInt(33);
 
@@ -327,7 +328,7 @@ public class RowAssemblerTest {
         SchemaDescriptor schema = new SchemaDescriptor(tableId, 42, keyCols, valCols);
 
         {
-            RowAssembler asm = new RowAssembler(schema, 0, 0, 1);
+            RowAssembler asm = new RowAssembler(schema, 128, 0, 128, 1);
 
             asm.appendByte((byte)-33);
             asm.appendString("val");
@@ -336,7 +337,7 @@ public class RowAssemblerTest {
         }
 
         { // Null key.
-            RowAssembler asm = new RowAssembler(schema, 0, 0, 1);
+            RowAssembler asm = new RowAssembler(schema, 128, 0, 128, 1);
 
             asm.appendNull();
             asm.appendString("val");
@@ -345,7 +346,7 @@ public class RowAssemblerTest {
         }
 
         { // No value.
-            RowAssembler asm = new RowAssembler(schema, 0, 0, 0);
+            RowAssembler asm = new RowAssembler(schema, 128, 0, 128, 0);
 
             asm.appendByte((byte)33);
 
@@ -364,7 +365,7 @@ public class RowAssemblerTest {
         SchemaDescriptor schema = new SchemaDescriptor(tableId, 42, keyCols, valCols);
 
         {
-            RowAssembler asm = new RowAssembler(schema, 0, 1, 0);
+            RowAssembler asm = new RowAssembler(schema, 128, 1, 128, 0);
 
             asm.appendString("key");
             asm.appendUuid(uuidVal);
@@ -375,7 +376,7 @@ public class RowAssemblerTest {
         }
 
         { // Null value.
-            RowAssembler asm = new RowAssembler(schema, 0, 1, 0);
+            RowAssembler asm = new RowAssembler(schema, 128, 1, 128, 0);
 
             asm.appendString("key");
             asm.appendNull();
@@ -384,7 +385,7 @@ public class RowAssemblerTest {
         }
 
         { // No value.
-            RowAssembler asm = new RowAssembler(schema, 0, 1, 0);
+            RowAssembler asm = new RowAssembler(schema, 128, 1, 128, 0);
 
             asm.appendString("key");
 
@@ -403,7 +404,7 @@ public class RowAssemblerTest {
         SchemaDescriptor schema = new SchemaDescriptor(tableId, 42, keyCols, valCols);
 
         {
-            RowAssembler asm = new RowAssembler(schema, 0, 1, 0);
+            RowAssembler asm = new RowAssembler(schema, 128, 1, 128, 0);
 
             asm.appendString("key");
             asm.appendUuid(uuidVal);
@@ -414,7 +415,7 @@ public class RowAssemblerTest {
         }
 
         { // No value.
-            RowAssembler asm = new RowAssembler(schema, 0, 1, 0);
+            RowAssembler asm = new RowAssembler(schema, 128, 1, 128, 0);
 
             asm.appendString("key");
 
@@ -433,29 +434,29 @@ public class RowAssemblerTest {
         SchemaDescriptor schema = new SchemaDescriptor(tableId, 42, keyCols, valCols);
 
         {
-            RowAssembler asm = new RowAssembler(schema, 0, 1, 1);
+            RowAssembler asm = new RowAssembler(schema, 128, 1, 128, 1);
 
             asm.appendString("key");
             asm.appendBytes(new byte[] {-1, 1, 0, 120});
 
-            assertRowBytesEquals(new byte[] {42, 0, 2, 0, 95, -98, 1, 0, 11, 0, 0, 0, 1, 0, 8, 0, 107, 101, 121, 13, 0, 0, 0, 0, 1, 0, 9, 0, -1, 1, 0, 120}, asm.build());
+            assertRowBytesEquals(new byte[] {42, 0, 98, 0, -32, 58, -39, -77, 6, 1, 0, 107, 101, 121, 8, 0, 1, 0, -1, 1, 0, 120}, asm.build());
         }
 
         { // Null value.
-            RowAssembler asm = new RowAssembler(schema, 0, 1, 0);
+            RowAssembler asm = new RowAssembler(schema, 128, 1, 128, 0);
 
             asm.appendString("key");
             asm.appendNull();
 
-            assertRowBytesEquals(new byte[] {42, 0, 18, 0, 95, -98, 1, 0, 11, 0, 0, 0, 1, 0, 8, 0, 107, 101, 121, 5, 0, 0, 0, 1}, asm.build());
+            assertRowBytesEquals(new byte[] {42, 0, 106, 0, -32, 58, -39, -77, 6, 1, 0, 107, 101, 121, 2, 1}, asm.build());
         }
 
         { // No value.
-            RowAssembler asm = new RowAssembler(schema, 0, 1, 0);
+            RowAssembler asm = new RowAssembler(schema, 128, 1, 128, 0);
 
             asm.appendString("key");
 
-            assertRowBytesEquals(new byte[] {42, 0, 19, 0, 95, -98, 1, 0, 11, 0, 0, 0, 1, 0, 8, 0, 107, 101, 121}, asm.build());
+            assertRowBytesEquals(new byte[] {42, 0, 107, 0, -32, 58, -39, -77, 6, 1, 0, 107, 101, 121}, asm.build());
         }
     }
 
@@ -470,7 +471,7 @@ public class RowAssemblerTest {
         SchemaDescriptor schema = new SchemaDescriptor(tableId, 42, keyCols, valCols);
 
         {
-            RowAssembler asm = new RowAssembler(schema, 0, 1, 1);
+            RowAssembler asm = new RowAssembler(schema, 128, 1, 128, 1);
 
             asm.appendString("key");
             asm.appendBytes(new byte[] {-1, 1, 0, 120});
@@ -479,7 +480,7 @@ public class RowAssemblerTest {
         }
 
         { // No value.
-            RowAssembler asm = new RowAssembler(schema, 0, 1, 0);
+            RowAssembler asm = new RowAssembler(schema, 128, 1, 128, 0);
 
             asm.appendString("key");
 
@@ -498,47 +499,47 @@ public class RowAssemblerTest {
         SchemaDescriptor schema = new SchemaDescriptor(tableId, 42, keyCols, valCols);
 
         {
-            RowAssembler asm = new RowAssembler(schema, 0, 1, 0);
+            RowAssembler asm = new RowAssembler(schema, 128, 1, 128, 0);
 
             asm.appendString("key");
             asm.appendShort((short)-71);
 
-            assertRowBytesEquals(new byte[] {42, 0, 16, 0, 95, -98, 1, 0, 12, 0, 0, 0, 0, 1, 0, 9, 0, 107, 101, 121, 7, 0, 0, 0, 0, -71, -1}, asm.build());
+            assertRowBytesEquals(new byte[] {42, 0, 104, 0, -32, 58, -39, -77, 7, 0, 1, 0, 107, 101, 121, 4, 0, -71, -1}, asm.build());
         }
 
         { // Null key.
-            RowAssembler asm = new RowAssembler(schema, 0, 0, 0);
+            RowAssembler asm = new RowAssembler(schema, 128, 0, 128, 0);
 
             asm.appendNull();
             asm.appendShort((short)71);
 
-            assertRowBytesEquals(new byte[] {42, 0, 24, 0, 0, 0, 0, 0, 5, 0, 0, 0, 1, 7, 0, 0, 0, 0, 71, 0}, asm.build());
+            assertRowBytesEquals(new byte[] {42, 0, 104, 0, 5, 93, -52, 2, 2, 1, 4, 0, 71, 0}, asm.build());
         }
 
         { // Null value.
-            RowAssembler asm = new RowAssembler(schema, 0, 1, 0);
+            RowAssembler asm = new RowAssembler(schema, 128, 1, 128, 0);
 
             asm.appendString("key");
             asm.appendNull();
 
-            assertRowBytesEquals(new byte[] {42, 0, 16, 0, 95, -98, 1, 0, 12, 0, 0, 0, 0, 1, 0, 9, 0, 107, 101, 121, 5, 0, 0, 0, 1}, asm.build());
+            assertRowBytesEquals(new byte[] {42, 0, 104, 0, -32, 58, -39, -77, 7, 0, 1, 0, 107, 101, 121, 2, 1}, asm.build());
         }
 
         { // Null both.
-            RowAssembler asm = new RowAssembler(schema, 0, 0, 0);
+            RowAssembler asm = new RowAssembler(schema, 128, 0, 128, 0);
 
             asm.appendNull();
             asm.appendNull();
 
-            assertRowBytesEquals(new byte[] {42, 0, 24, 0, 0, 0, 0, 0, 5, 0, 0, 0, 1, 5, 0, 0, 0, 1}, asm.build());
+            assertRowBytesEquals(new byte[] {42, 0, 104, 0, 5, 93, -52, 2, 2, 1, 2, 1}, asm.build());
         }
 
         { // No value.
-            RowAssembler asm = new RowAssembler(schema, 0, 1, 0);
+            RowAssembler asm = new RowAssembler(schema, 128, 1, 128, 0);
 
             asm.appendString("key");
 
-            assertRowBytesEquals(new byte[] {42, 0, 17, 0, 95, -98, 1, 0, 12, 0, 0, 0, 0, 1, 0, 9, 0, 107, 101, 121}, asm.build());
+            assertRowBytesEquals(new byte[] {42, 0, 105, 0, -32, 58, -39, -77, 7, 0, 1, 0, 107, 101, 121}, asm.build());
         }
     }
 
@@ -553,7 +554,7 @@ public class RowAssemblerTest {
         SchemaDescriptor schema = new SchemaDescriptor(tableId, 42, keyCols, valCols);
 
         {
-            RowAssembler asm = new RowAssembler(schema, 0, 1, 0);
+            RowAssembler asm = new RowAssembler(schema, 128, 1, 128, 0);
 
             asm.appendString("key");
             asm.appendShort((short)-71L);
@@ -562,7 +563,7 @@ public class RowAssemblerTest {
         }
 
         { // Null key.
-            RowAssembler asm = new RowAssembler(schema, 0, 0, 0);
+            RowAssembler asm = new RowAssembler(schema, 128, 0, 128, 0);
 
             asm.appendNull();
             asm.appendShort((short)71);
@@ -571,7 +572,7 @@ public class RowAssemblerTest {
         }
 
         { // No value.
-            RowAssembler asm = new RowAssembler(schema, 0, 1, 0);
+            RowAssembler asm = new RowAssembler(schema, 128, 1, 128, 0);
 
             asm.appendString("key");
 
@@ -590,7 +591,7 @@ public class RowAssemblerTest {
         SchemaDescriptor schema = new SchemaDescriptor(tableId, 42, keyCols, valCols);
 
         {
-            RowAssembler asm = new RowAssembler(schema, 0, 1, 1);
+            RowAssembler asm = new RowAssembler(schema, 128, 1, 128, 1);
 
             asm.appendString("key");
             asm.appendBytes(new byte[] {-1, 1, 0, 120});
@@ -599,7 +600,7 @@ public class RowAssemblerTest {
         }
 
         { // Null key.
-            RowAssembler asm = new RowAssembler(schema, 0, 0, 1);
+            RowAssembler asm = new RowAssembler(schema, 128, 0, 128, 1);
 
             asm.appendNull();
             asm.appendBytes(new byte[] {-1, 1, 0, 120});
@@ -608,7 +609,7 @@ public class RowAssemblerTest {
         }
 
         { // Null value.
-            RowAssembler asm = new RowAssembler(schema, 0, 1, 0);
+            RowAssembler asm = new RowAssembler(schema, 128, 1, 128, 0);
 
             asm.appendString("key");
             asm.appendNull();
@@ -617,7 +618,7 @@ public class RowAssemblerTest {
         }
 
         { // Null both.
-            RowAssembler asm = new RowAssembler(schema, 0, 0, 0);
+            RowAssembler asm = new RowAssembler(schema, 128, 0, 128, 0);
 
             asm.appendNull();
             asm.appendNull();
@@ -626,7 +627,7 @@ public class RowAssemblerTest {
         }
 
         { // No value.
-            RowAssembler asm = new RowAssembler(schema, 0, 1, 0);
+            RowAssembler asm = new RowAssembler(schema, 128, 1, 128, 0);
 
             asm.appendString("key");
 
@@ -645,7 +646,7 @@ public class RowAssemblerTest {
         SchemaDescriptor schema = new SchemaDescriptor(tableId, 42, keyCols, valCols);
 
         {
-            RowAssembler asm = new RowAssembler(schema, 0, 1, 1);
+            RowAssembler asm = new RowAssembler(schema, 128, 1, 128, 1);
 
             asm.appendString("key");
             asm.appendBytes(new byte[] {-1, 1, 0, 120});
@@ -654,7 +655,7 @@ public class RowAssemblerTest {
         }
 
         { // Null key.
-            RowAssembler asm = new RowAssembler(schema, 0, 0, 1);
+            RowAssembler asm = new RowAssembler(schema, 128, 0, 128, 1);
 
             asm.appendNull();
             asm.appendBytes(new byte[] {-1, 1, 0, 120});
@@ -663,7 +664,7 @@ public class RowAssemblerTest {
         }
 
         { // No value.
-            RowAssembler asm = new RowAssembler(schema, 0, 1, 0);
+            RowAssembler asm = new RowAssembler(schema, 128, 1, 128, 0);
 
             asm.appendString("key");
 
@@ -685,10 +686,10 @@ public class RowAssemblerTest {
             new Column("valStrCol", STRING, true)
         };
 
-        SchemaDescriptor schema = new SchemaDescriptor(tableId,42, keyCols, valCols);
+        SchemaDescriptor schema = new SchemaDescriptor(tableId, 42, keyCols, valCols);
 
         {
-            RowAssembler asm = new RowAssembler(schema, 0, 1, 1);
+            RowAssembler asm = new RowAssembler(schema, 128, 1, 128, 1);
 
             asm.appendShort((short)33);
             asm.appendString("keystr");
@@ -702,7 +703,7 @@ public class RowAssemblerTest {
         }
 
         { // Null value.
-            RowAssembler asm = new RowAssembler(schema, 0, 1, 0);
+            RowAssembler asm = new RowAssembler(schema, 128, 1, 128, 0);
 
             asm.appendShort((short)33);
             asm.appendString("keystr2");
@@ -716,7 +717,7 @@ public class RowAssemblerTest {
         }
 
         { // No value.
-            RowAssembler asm = new RowAssembler(schema, 0, 1, 0);
+            RowAssembler asm = new RowAssembler(schema, 128, 1, 128, 0);
 
             asm.appendShort((short)33);
             asm.appendString("keystr");
diff --git a/modules/schema/src/test/java/org/apache/ignite/internal/schema/RowTest.java b/modules/schema/src/test/java/org/apache/ignite/internal/schema/RowTest.java
index 659dd5f..5b87c56 100644
--- a/modules/schema/src/test/java/org/apache/ignite/internal/schema/RowTest.java
+++ b/modules/schema/src/test/java/org/apache/ignite/internal/schema/RowTest.java
@@ -21,18 +21,20 @@ import java.util.Arrays;
 import java.util.BitSet;
 import java.util.Collections;
 import java.util.Random;
+import org.apache.ignite.internal.schema.row.Row;
+import org.apache.ignite.internal.schema.row.RowAssembler;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import static org.apache.ignite.internal.schema.NativeTypes.BYTE;
+import static org.apache.ignite.internal.schema.NativeTypes.BYTES;
 import static org.apache.ignite.internal.schema.NativeTypes.DOUBLE;
 import static org.apache.ignite.internal.schema.NativeTypes.FLOAT;
 import static org.apache.ignite.internal.schema.NativeTypes.INTEGER;
 import static org.apache.ignite.internal.schema.NativeTypes.LONG;
 import static org.apache.ignite.internal.schema.NativeTypes.SHORT;
-import static org.apache.ignite.internal.schema.NativeTypes.UUID;
-import static org.apache.ignite.internal.schema.NativeTypes.BYTES;
 import static org.apache.ignite.internal.schema.NativeTypes.STRING;
+import static org.apache.ignite.internal.schema.NativeTypes.UUID;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
@@ -261,44 +263,29 @@ public class RowTest {
 
         int nonNullVarLenKeyCols = 0;
         int nonNullVarLenValCols = 0;
-        int nonNullVarLenKeySize = 0;
-        int nonNullVarLenValSize = 0;
 
         for (int i = 0; i < vals.length; i++) {
             NativeTypeSpec type = schema.column(i).type().spec();
 
             if (vals[i] != null && !type.fixedLength()) {
                 if (type == NativeTypeSpec.BYTES) {
-                    byte[] val = (byte[])vals[i];
-                    if (schema.isKeyColumn(i)) {
+                    if (schema.isKeyColumn(i))
                         nonNullVarLenKeyCols++;
-                        nonNullVarLenKeySize += val.length;
-                    }
-                    else {
+                    else
                         nonNullVarLenValCols++;
-                        nonNullVarLenValSize += val.length;
-                    }
                 }
                 else if (type == NativeTypeSpec.STRING) {
-                    if (schema.isKeyColumn(i)) {
+                    if (schema.isKeyColumn(i))
                         nonNullVarLenKeyCols++;
-                        nonNullVarLenKeySize += RowAssembler.utf8EncodedLength((CharSequence)vals[i]);
-                    }
-                    else {
+                    else
                         nonNullVarLenValCols++;
-                        nonNullVarLenValSize += RowAssembler.utf8EncodedLength((CharSequence)vals[i]);
-                    }
                 }
                 else
                     throw new IllegalStateException("Unsupported variable-length type: " + type);
             }
         }
 
-        int size = RowAssembler.rowSize(
-            schema.keyColumns(), nonNullVarLenKeyCols, nonNullVarLenKeySize,
-            schema.valueColumns(), nonNullVarLenValCols, nonNullVarLenValSize);
-
-        RowAssembler asm = new RowAssembler(schema, size, nonNullVarLenKeyCols, nonNullVarLenValCols);
+        RowAssembler asm = new RowAssembler(schema, nonNullVarLenKeyCols, nonNullVarLenValCols);
 
         for (int i = 0; i < vals.length; i++) {
             if (vals[i] == null)
@@ -355,7 +342,7 @@ public class RowTest {
 
         byte[] data = asm.build();
 
-        Row tup = new Row(schema, new ByteBufferRow(data));
+        Row row = new Row(schema, new ByteBufferRow(data));
 
         for (int i = 0; i < vals.length; i++) {
             Column col = schema.column(i);
@@ -363,10 +350,10 @@ public class RowTest {
             NativeTypeSpec type = col.type().spec();
 
             if (type == NativeTypeSpec.BYTES)
-                assertArrayEquals((byte[])vals[i], (byte[])NativeTypeSpec.BYTES.objectValue(tup, i),
+                assertArrayEquals((byte[])vals[i], (byte[])NativeTypeSpec.BYTES.objectValue(row, i),
                     "Failed for column: " + col);
             else
-                assertEquals(vals[i], type.objectValue(tup, i), "Failed for column: " + col);
+                assertEquals(vals[i], type.objectValue(row, i), "Failed for column: " + col);
         }
     }
 
diff --git a/modules/schema/src/test/java/org/apache/ignite/internal/schema/marshaller/reflection/FieldAccessorTest.java b/modules/schema/src/test/java/org/apache/ignite/internal/schema/marshaller/reflection/FieldAccessorTest.java
index 01bda07..ceec3bd 100644
--- a/modules/schema/src/test/java/org/apache/ignite/internal/schema/marshaller/reflection/FieldAccessorTest.java
+++ b/modules/schema/src/test/java/org/apache/ignite/internal/schema/marshaller/reflection/FieldAccessorTest.java
@@ -25,8 +25,8 @@ import java.util.Random;
 import java.util.UUID;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.NativeTypes;
-import org.apache.ignite.internal.schema.Row;
-import org.apache.ignite.internal.schema.RowAssembler;
+import org.apache.ignite.internal.schema.row.Row;
+import org.apache.ignite.internal.schema.row.RowAssembler;
 import org.apache.ignite.internal.schema.TestUtils;
 import org.apache.ignite.internal.schema.marshaller.BinaryMode;
 import org.apache.ignite.internal.schema.marshaller.SerializationException;
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java
index 72ac0ce..bb8103b 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java
@@ -31,8 +31,8 @@ import org.apache.ignite.internal.affinity.RendezvousAffinityFunction;
 import org.apache.ignite.internal.schema.ByteBufferRow;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.NativeTypes;
-import org.apache.ignite.internal.schema.Row;
-import org.apache.ignite.internal.schema.RowAssembler;
+import org.apache.ignite.internal.schema.row.Row;
+import org.apache.ignite.internal.schema.row.RowAssembler;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.SchemaRegistry;
 import org.apache.ignite.internal.table.TableImpl;
@@ -203,7 +203,7 @@ public class ITDistributedTableTest {
      * @return Row.
      */
     @NotNull private Row getTestKey() {
-        RowAssembler rowBuilder = new RowAssembler(SCHEMA, 4096, 0, 0);
+        RowAssembler rowBuilder = new RowAssembler(SCHEMA, 0, 0);
 
         rowBuilder.appendLong(1L);
 
@@ -216,7 +216,7 @@ public class ITDistributedTableTest {
      * @return Row.
      */
     @NotNull private Row getTestRow() {
-        RowAssembler rowBuilder = new RowAssembler(SCHEMA, 4096, 0, 0);
+        RowAssembler rowBuilder = new RowAssembler(SCHEMA, 0, 0);
 
         rowBuilder.appendLong(1L);
         rowBuilder.appendLong(10L);
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/schema/marshaller/KVSerializer.java b/modules/table/src/main/java/org/apache/ignite/internal/schema/marshaller/KVSerializer.java
index 5c4f32d..f13e20a 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/schema/marshaller/KVSerializer.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/schema/marshaller/KVSerializer.java
@@ -17,7 +17,7 @@
 
 package org.apache.ignite.internal.schema.marshaller;
 
-import org.apache.ignite.internal.schema.Row;
+import org.apache.ignite.internal.schema.row.Row;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/schema/marshaller/RecordSerializer.java b/modules/table/src/main/java/org/apache/ignite/internal/schema/marshaller/RecordSerializer.java
index 08c676e..c74a5c8 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/schema/marshaller/RecordSerializer.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/schema/marshaller/RecordSerializer.java
@@ -17,7 +17,7 @@
 
 package org.apache.ignite.internal.schema.marshaller;
 
-import org.apache.ignite.internal.schema.Row;
+import org.apache.ignite.internal.schema.row.Row;
 import org.jetbrains.annotations.NotNull;
 
 /**
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/schema/marshaller/TupleMarshaller.java b/modules/table/src/main/java/org/apache/ignite/internal/schema/marshaller/TupleMarshaller.java
index e9728b4..7556fe3 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/schema/marshaller/TupleMarshaller.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/schema/marshaller/TupleMarshaller.java
@@ -17,7 +17,7 @@
 
 package org.apache.ignite.internal.schema.marshaller;
 
-import org.apache.ignite.internal.schema.Row;
+import org.apache.ignite.internal.schema.row.Row;
 import org.apache.ignite.table.Tuple;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/KVBinaryViewImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/KVBinaryViewImpl.java
index be1c3ac..174e577 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/KVBinaryViewImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/KVBinaryViewImpl.java
@@ -24,7 +24,7 @@ import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 import org.apache.ignite.internal.schema.BinaryRow;
-import org.apache.ignite.internal.schema.Row;
+import org.apache.ignite.internal.schema.row.Row;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.marshaller.TupleMarshaller;
 import org.apache.ignite.internal.schema.SchemaRegistry;
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/KVViewImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/KVViewImpl.java
index c744e93..91e3a19 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/KVViewImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/KVViewImpl.java
@@ -23,7 +23,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.schema.BinaryRow;
-import org.apache.ignite.internal.schema.Row;
+import org.apache.ignite.internal.schema.row.Row;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.marshaller.KVSerializer;
 import org.apache.ignite.internal.schema.SchemaRegistry;
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
index 27bdc44..f535508 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
@@ -23,7 +23,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.schema.BinaryRow;
-import org.apache.ignite.internal.schema.Row;
+import org.apache.ignite.internal.schema.row.Row;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.marshaller.RecordSerializer;
 import org.apache.ignite.internal.schema.SchemaRegistry;
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/RowChunkAdapter.java b/modules/table/src/main/java/org/apache/ignite/internal/table/RowChunkAdapter.java
index 9b0483a..2e213f1 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/RowChunkAdapter.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/RowChunkAdapter.java
@@ -22,7 +22,7 @@ import java.util.UUID;
 import org.apache.ignite.binary.BinaryObject;
 import org.apache.ignite.binary.BinaryObjects;
 import org.apache.ignite.internal.schema.Column;
-import org.apache.ignite.internal.schema.Row;
+import org.apache.ignite.internal.schema.row.Row;
 import org.apache.ignite.table.Tuple;
 import org.jetbrains.annotations.NotNull;
 
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
index a7cee35..83884ac 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
@@ -25,7 +25,7 @@ import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 import org.apache.ignite.internal.schema.BinaryRow;
-import org.apache.ignite.internal.schema.Row;
+import org.apache.ignite.internal.schema.row.Row;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.marshaller.TupleMarshaller;
 import org.apache.ignite.internal.schema.SchemaRegistry;
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/TableRow.java b/modules/table/src/main/java/org/apache/ignite/internal/table/TableRow.java
index 74ed57a..31ab606 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/TableRow.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/TableRow.java
@@ -19,7 +19,7 @@ package org.apache.ignite.internal.table;
 
 import java.util.Objects;
 import org.apache.ignite.internal.schema.Column;
-import org.apache.ignite.internal.schema.Row;
+import org.apache.ignite.internal.schema.row.Row;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.table.Tuple;
 import org.jetbrains.annotations.NotNull;
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/TupleMarshallerImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/TupleMarshallerImpl.java
index 7a6aed4..2966b92 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/TupleMarshallerImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/TupleMarshallerImpl.java
@@ -19,19 +19,16 @@ package org.apache.ignite.internal.table;
 
 import java.util.Arrays;
 import java.util.BitSet;
-import java.util.HashMap;
-import java.util.Map;
 import java.util.Objects;
 import java.util.UUID;
-import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.ByteBufferRow;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.Columns;
-import org.apache.ignite.internal.schema.Row;
-import org.apache.ignite.internal.schema.RowAssembler;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
-import org.apache.ignite.internal.schema.marshaller.TupleMarshaller;
 import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.internal.schema.marshaller.TupleMarshaller;
+import org.apache.ignite.internal.schema.row.Row;
+import org.apache.ignite.internal.schema.row.RowAssembler;
 import org.apache.ignite.table.Tuple;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
@@ -65,54 +62,57 @@ public class TupleMarshallerImpl implements TupleMarshaller {
 
         validate(keyTuple, schema.keyColumns());
 
-        ChunkData keyChunk = chunkData(schema.keyColumns(), keyTuple);
-        ChunkData valChunk = chunkData(schema.valueColumns(), valTuple);
+        TupleStatistics keyChunk = tupleStatistics(schema.keyColumns(), keyTuple);
+        TupleStatistics valChunk = tupleStatistics(schema.valueColumns(), valTuple);
 
         final RowAssembler rowBuilder = createAssembler(schema, keyChunk, valChunk);
 
         for (int i = 0; i < schema.keyColumns().length(); i++) {
             final Column col = schema.keyColumns().column(i);
 
-            writeColumn(rowBuilder, col, keyChunk.data.get(i));
+            writeColumn(rowBuilder, col, keyTuple.value(col.name()));
         }
 
-        if (valChunk.data != null) {
+        if (valTuple != null) {
             validate(valTuple, schema.valueColumns());
 
             for (int i = 0; i < schema.valueColumns().length(); i++) {
                 final Column col = schema.valueColumns().column(i);
 
-                writeColumn(rowBuilder, col, valChunk.data.get(i));
+                writeColumn(rowBuilder, col, valTuple.value(col.name()));
             }
         }
 
         return new Row(schema, new ByteBufferRow(rowBuilder.build()));
     }
 
-    private ChunkData chunkData(Columns cols, Tuple tuple) {
-        if (tuple == null)
-            return new ChunkData();
-
-        ChunkData chunk = new ChunkData();
+    /**
+     * Analyze given tuple and gather statistics.
+     *
+     * @param cols Columns which statistics is calculated for.
+     * @param tup Tuple to analyze.
+     * @return Tuple statistics.
+     */
+    private TupleStatistics tupleStatistics(Columns cols, Tuple tup) {
+        if (tup == null)
+            return new TupleStatistics();
 
-        chunk.data = new HashMap<>();
+        TupleStatistics chunk = new TupleStatistics();
 
         for (int i = 0; i < cols.length(); i++) {
             Column col = cols.column(i);
 
-            Object val = (tuple.contains(col.name())) ? tuple.value(col.name()) : col.defaultValue();
+            Object val = (tup.contains(col.name())) ? tup.value(col.name()) : col.defaultValue();
 
             if (val == null)
                 chunk.hasNulls = true;
             else {
-                chunk.data.put(i, val);
-
                 if (col.type().spec().fixedLength())
-                    chunk.dataSize += col.type().sizeInBytes();
+                    chunk.payloadLen += col.type().sizeInBytes();
                 else {
                     chunk.nonNullVarlen++;
 
-                    chunk.dataSize += getValueSize(val, col.type());
+                    chunk.payloadLen += getValueSize(val, col.type());
                 }
             }
         }
@@ -120,16 +120,12 @@ public class TupleMarshallerImpl implements TupleMarshaller {
         return chunk;
     }
 
-    class ChunkData {
-        public boolean hasNulls;
-        public int dataSize;
-        public int nonNullVarlen;
-        Map<Integer, Object> data;
-
-    }
-
     /**
+     * Validates columns values.
      *
+     * @param tuple Tuple to validate.
+     * @param columns Columns to validate against.
+     * @throws SchemaMismatchException If validation failed.
      */
     private void validate(Tuple tuple, Columns columns) {
         if (tuple instanceof TupleBuilderImpl) {
@@ -148,23 +144,19 @@ public class TupleMarshallerImpl implements TupleMarshaller {
     /**
      * Creates {@link RowAssembler} for key-value tuples.
      *
-     * @param keyTuple Key tuple.
-     * @param valTuple Value tuple.
+     * @param keyStat Key tuple statistics.
+     * @param valStat Value tuple statistics.
      * @return Row assembler.
      */
-    private RowAssembler createAssembler(SchemaDescriptor schema, ChunkData keyChunk, ChunkData valChunk) {
-        final int keyOffSize = RowAssembler.vartableOffSize(keyChunk.nonNullVarlen, keyChunk.dataSize);
-        final int valOffSize = RowAssembler.vartableOffSize(valChunk.nonNullVarlen, valChunk.dataSize);
-
-        int size = BinaryRow.HEADER_SIZE +
-            (keyChunk.hasNulls ? schema.keyColumns().nullMapSize() : 0) +
-            RowAssembler.vartableSize(valChunk.nonNullVarlen, valOffSize) +
-            keyChunk.dataSize +
-            (valChunk.hasNulls ? schema.valueColumns().nullMapSize() : 0) +
-            RowAssembler.vartableSize(valChunk.nonNullVarlen, valOffSize) +
-            valChunk.dataSize;
-
-        return new RowAssembler(schema, size, keyOffSize, keyChunk.nonNullVarlen, valOffSize, valChunk.nonNullVarlen);
+    private RowAssembler createAssembler(SchemaDescriptor schema, TupleStatistics keyStat, TupleStatistics valStat) {
+        return new RowAssembler(
+            schema,
+            keyStat.payloadLen,
+            keyStat.hasNulls,
+            keyStat.nonNullVarlen,
+            valStat.payloadLen,
+            valStat.hasNulls,
+            valStat.nonNullVarlen);
     }
 
     /**
@@ -234,4 +226,18 @@ public class TupleMarshallerImpl implements TupleMarshaller {
                 throw new IllegalStateException("Unexpected value: " + col.type());
         }
     }
+
+    /**
+     * Tuple statistics record.
+     */
+    private static class TupleStatistics {
+        /** Tuple has nulls. */
+        boolean hasNulls;
+
+        /** Payload length in bytes. */
+        int payloadLen;
+
+        /** Number of non-null varlen columns. */
+        int nonNullVarlen;
+    }
 }
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
index 8621d20..a66265c 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
@@ -28,9 +28,9 @@ import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.ByteBufferRow;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.NativeTypes;
-import org.apache.ignite.internal.schema.Row;
-import org.apache.ignite.internal.schema.RowAssembler;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.row.Row;
+import org.apache.ignite.internal.schema.row.RowAssembler;
 import org.apache.ignite.internal.table.distributed.command.DeleteAllCommand;
 import org.apache.ignite.internal.table.distributed.command.DeleteCommand;
 import org.apache.ignite.internal.table.distributed.command.DeleteExactAllCommand;
@@ -677,7 +677,7 @@ public class PartitionCommandListenerTest {
      * @return Row.
      */
     @NotNull private Row getTestKey(int key) {
-        RowAssembler rowBuilder = new RowAssembler(SCHEMA, 4096, 0, 0);
+        RowAssembler rowBuilder = new RowAssembler(SCHEMA, 0, 0);
 
         rowBuilder.appendInt(key);
 
@@ -690,7 +690,7 @@ public class PartitionCommandListenerTest {
      * @return Row.
      */
     @NotNull private Row getTestRow(int key, int val) {
-        RowAssembler rowBuilder = new RowAssembler(SCHEMA, 4096, 0, 0);
+        RowAssembler rowBuilder = new RowAssembler(SCHEMA, 0, 0);
 
         rowBuilder.appendInt(key);
         rowBuilder.appendInt(val);