You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2021/06/09 19:36:13 UTC

[ignite-3] 01/15: Add tiny,normal,large chunk formats.

This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch ignite-14743
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit b0bc9e531233e5eb811227c1731b5f2e523b4204
Author: Andrew Mashenkov <an...@gmail.com>
AuthorDate: Fri May 28 21:53:05 2021 +0300

    Add tiny,normal,large chunk formats.
---
 .../apache/ignite/internal/schema/BinaryRow.java   |  17 +++
 .../ignite/internal/schema/ByteBufferRow.java      |  16 ++-
 .../org/apache/ignite/internal/schema/Column.java  |   2 +
 .../org/apache/ignite/internal/schema/Row.java     |  79 ++++++------
 .../ignite/internal/schema/RowAssembler.java       |  77 ++++++++++--
 .../marshaller/reflection/JavaSerializer.java      |   2 +-
 .../ignite/internal/table/TupleMarshallerImpl.java | 139 ++++++++++-----------
 7 files changed, 213 insertions(+), 119 deletions(-)

diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRow.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRow.java
index 8088491..74be9fb 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRow.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRow.java
@@ -28,16 +28,25 @@ import java.nio.ByteBuffer;
 public interface BinaryRow {
     /** */
     int SCHEMA_VERSION_OFFSET = 0;
+
     /** */
     int FLAGS_FIELD_OFFSET = SCHEMA_VERSION_OFFSET + 2 /* version length */;
+
     /** */
     int KEY_HASH_FIELD_OFFSET = FLAGS_FIELD_OFFSET + 2 /* flags length */;
+
     /** */
     int KEY_CHUNK_OFFSET = KEY_HASH_FIELD_OFFSET + 4 /* hash length */;
+
+    /** */
+    int HEADER_SIZE = KEY_CHUNK_OFFSET;
+
     /** */
     int CHUNK_LEN_FIELD_SIZE = 4;
+
     /** */
     int VARLEN_TABLE_SIZE_FIELD_SIZE = 2;
+
     /** */
     int VARLEN_COLUMN_OFFSET_FIELD_SIZE = 2;
 
@@ -148,6 +157,14 @@ public interface BinaryRow {
         /** Flag indicates value chunk omits varlen table. */
         public static final int OMIT_VAL_VARTBL_FLAG = 1 << 4;
 
+        public static final int KEY_TYNY_FORMAT = 1 << 5;
+
+        public static final int KEY_LARGE_ROW_FORMAT = 1 << 6;
+
+        public static final int VAL_TYNY_FORMAT = 1 << 7;
+
+        public static final int VAL_LARGE_FORMAT = 1 << 8;
+
         /** Stub. */
         private RowFlags() {
             // No-op.
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java
index 6ae8889..9c9c81e 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java
@@ -129,8 +129,12 @@ public class ByteBufferRow implements BinaryRow {
 
     /** {@inheritDoc} */
     @Override public ByteBuffer keySlice() {
+        final short flags = readShort(FLAGS_FIELD_OFFSET);
+
         final int off = KEY_CHUNK_OFFSET;
-        final int len = readInteger(off);
+        final int len = (flags & RowFlags.KEY_LARGE_ROW_FORMAT) != 0 ? readInteger(off) :
+            (flags & RowFlags.KEY_TYNY_FORMAT) != 0 ? readByte(off) :
+                readShort(off);
 
         try {
             return buf.limit(off + len).position(off).slice();
@@ -143,8 +147,14 @@ public class ByteBufferRow implements BinaryRow {
 
     /** {@inheritDoc} */
     @Override public ByteBuffer valueSlice() {
-        int off = KEY_CHUNK_OFFSET + readInteger(KEY_CHUNK_OFFSET);
-        int len = hasValue() ? readInteger(off) : 0;
+        final short flags = readShort(FLAGS_FIELD_OFFSET);
+
+        int off = KEY_CHUNK_OFFSET +
+            ((flags & RowFlags.KEY_LARGE_ROW_FORMAT) != 0 ? readInteger(KEY_CHUNK_OFFSET) :
+            (flags & RowFlags.KEY_TYNY_FORMAT) != 0 ? readByte(KEY_CHUNK_OFFSET) : readShort(KEY_CHUNK_OFFSET));
+
+        int len = hasValue() ? (flags & RowFlags.VAL_LARGE_FORMAT) != 0 ? readInteger(off) :
+            (flags & RowFlags.VAL_TYNY_FORMAT) != 0 ? readByte(off) : readShort(off) : 0;
 
         try {
             return buf.limit(off + len).position(off).slice();
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/Column.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/Column.java
index 815f469..ecc0a85 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/Column.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/Column.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.schema;
 
 import java.io.Serializable;
 import java.util.function.Supplier;
+import org.apache.ignite.internal.tostring.IgniteToStringExclude;
 import org.apache.ignite.internal.tostring.S;
 import org.jetbrains.annotations.NotNull;
 
@@ -50,6 +51,7 @@ public class Column implements Comparable<Column>, Serializable {
     /**
      * Default value supplier.
      */
+    @IgniteToStringExclude
     private final Supplier<Object> defValSup;
 
     /**
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/Row.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/Row.java
index a2fa63e..4ed78b0 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/Row.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/Row.java
@@ -39,14 +39,6 @@ public class Row implements BinaryRow {
     private final BinaryRow row;
 
     /**
-     * @param itemIdx Varlen table item index.
-     * @return Varlen item offset.
-     */
-    public static int varlenItemOffset(int itemIdx) {
-        return VARLEN_TABLE_SIZE_FIELD_SIZE + itemIdx * VARLEN_COLUMN_OFFSET_FIELD_SIZE;
-    }
-
-    /**
      * Constructor.
      *
      * @param schema Schema.
@@ -60,6 +52,14 @@ public class Row implements BinaryRow {
     }
 
     /**
+     * @param itemIdx Varlen table item index.
+     * @return Varlen item offset.
+     */
+    private int varlenItemOffset(int itemIdx, int itemSize) {
+        return itemSize + itemIdx * itemSize;
+    }
+
+    /**
      * @return Row schema.
      */
     public SchemaDescriptor rowSchema() {
@@ -347,13 +347,17 @@ public class Row implements BinaryRow {
 
         final short flags = readShort(FLAGS_FIELD_OFFSET);
 
+        int size = keyCol ?
+            ((flags & RowFlags.KEY_TYNY_FORMAT) != 0) ? 1 : (flags & RowFlags.KEY_LARGE_ROW_FORMAT) != 0 ? 4 : 2 :
+            (flags & RowFlags.VAL_TYNY_FORMAT) != 0 ? 1 : (flags & RowFlags.VAL_LARGE_FORMAT) != 0 ? 4 : 2;
+
         int off = KEY_CHUNK_OFFSET;
 
         if (!keyCol) {
             assert (flags & RowFlags.NO_VALUE_FLAG) == 0;
 
             // Jump to the next chunk, the size of the first chunk is written at the chunk start.
-            off += readInteger(off);
+            off += readAdaptiveSizedItem(off, size);
 
             // Adjust the column index according to the number of key columns.
             colIdx -= schema.keyColumns().length();
@@ -368,14 +372,14 @@ public class Row implements BinaryRow {
         boolean hasVarTable = ((keyCol ? RowFlags.OMIT_KEY_VARTBL_FLAG : RowFlags.OMIT_VAL_VARTBL_FLAG) & flags) == 0;
         boolean hasNullMap = ((keyCol ? RowFlags.OMIT_KEY_NULL_MAP_FLAG : RowFlags.OMIT_VAL_NULL_MAP_FLAG) & flags) == 0;
 
-        if (hasNullMap && isNull(off, colIdx))
+        if (hasNullMap && isNull(off + size, colIdx))
             return -1;
 
         assert hasVarTable || type.fixedLength();
 
         return type.fixedLength() ?
-            fixlenColumnOffset(cols, off, colIdx, hasVarTable, hasNullMap) :
-            varlenColumnOffsetAndLength(cols, off, colIdx, hasNullMap);
+            fixlenColumnOffset(cols, off, colIdx, hasVarTable, hasNullMap, size) :
+            varlenColumnOffsetAndLength(cols, off, colIdx, hasNullMap, size);
     }
 
     /**
@@ -391,13 +395,11 @@ public class Row implements BinaryRow {
     /**
      * Checks the row's null map for the given column index in the chunk.
      *
-     * @param baseOff Offset of the chunk start in the row.
+     * @param nullMapOff Null map offset
      * @param idx Offset of the column in the chunk.
      * @return {@code true} if the column value is {@code null}.
      */
-    private boolean isNull(int baseOff, int idx) {
-        int nullMapOff = nullMapOffset(baseOff);
-
+    private boolean isNull(int nullMapOff, int idx) {
         int nullByte = idx / 8;
         int posInByte = idx % 8;
 
@@ -439,17 +441,19 @@ public class Row implements BinaryRow {
      * @param baseOff Chunk base offset.
      * @param idx Column index in the chunk.
      * @param hasNullMap Has null map flag.
+     * @param adaptiveItemSize
      * @return Encoded offset (from the row start) and length of the column with the given index.
      */
-    private long varlenColumnOffsetAndLength(Columns cols, int baseOff, int idx, boolean hasNullMap) {
-        int vartableOff = baseOff + CHUNK_LEN_FIELD_SIZE;
+    private long varlenColumnOffsetAndLength(Columns cols, int baseOff, int idx, boolean hasNullMap,
+        int adaptiveItemSize) {
+        int vartableOff = baseOff + adaptiveItemSize /* Chunk len field size. */;
 
         int numNullsBefore = 0;
 
         if (hasNullMap) {
             vartableOff += cols.nullMapSize();
 
-            int nullMapOff = nullMapOffset(baseOff);
+            int nullMapOff = baseOff + adaptiveItemSize;
 
             int nullStartByte = cols.firstVarlengthColumn() / 8;
             int startBitInByte = cols.firstVarlengthColumn() % 8;
@@ -473,16 +477,16 @@ public class Row implements BinaryRow {
         }
 
         idx -= cols.numberOfFixsizeColumns() + numNullsBefore;
-        int vartableSize = readShort(vartableOff);
+        int vartableSize = readAdaptiveSizedItem(vartableOff, adaptiveItemSize);
 
         // Offset of idx-th column is from base offset.
-        int resOff = readShort(vartableOff + varlenItemOffset(idx));
+        int resOff = readAdaptiveSizedItem(vartableOff + varlenItemOffset(idx, adaptiveItemSize), adaptiveItemSize);
 
         long len = (idx == vartableSize - 1) ?
             // totalLength - columnStartOffset
-            readInteger(baseOff) - resOff :
+            readAdaptiveSizedItem(baseOff, adaptiveItemSize) - resOff :
             // nextColumnStartOffset - columnStartOffset
-            readShort(vartableOff + varlenItemOffset(idx + 1)) - resOff;
+            readAdaptiveSizedItem(vartableOff + varlenItemOffset(idx + 1, adaptiveItemSize), adaptiveItemSize) - resOff;
 
         return (len << 32) | (resOff + baseOff);
     }
@@ -496,12 +500,14 @@ public class Row implements BinaryRow {
      * @param idx Column index in the chunk.
      * @param hasVarTbl Has varlen table flag.
      * @param hasNullMap Has null map flag.
+     * @param adaptiveItemSize
      * @return Encoded offset (from the row start) of the requested fixlen column.
      */
-    int fixlenColumnOffset(Columns cols, int baseOff, int idx, boolean hasVarTbl, boolean hasNullMap) {
+    int fixlenColumnOffset(Columns cols, int baseOff, int idx, boolean hasVarTbl, boolean hasNullMap,
+        int adaptiveItemSize) {
         int colOff = 0;
 
-        int payloadOff = baseOff + CHUNK_LEN_FIELD_SIZE;
+        int payloadOff = baseOff + adaptiveItemSize /* Chunk len field size. */;
 
         // Calculate fixlen column offset.
         {
@@ -517,9 +523,9 @@ public class Row implements BinaryRow {
 
                 // Fold offset based on the whole map bytes in the schema
                 for (int i = 0; i < colByteIdx; i++)
-                    colOff += cols.foldFixedLength(i, readByte(nullMapOffset(baseOff) + i));
+                    colOff += cols.foldFixedLength(i, readByte(baseOff + adaptiveItemSize + i));
 
-                colOff += cols.foldFixedLength(colByteIdx, readByte(nullMapOffset(baseOff) + colByteIdx) | mask);
+                colOff += cols.foldFixedLength(colByteIdx, readByte(baseOff + adaptiveItemSize + colByteIdx) | mask);
             }
             else {
                 for (int i = 0; i < colByteIdx; i++)
@@ -530,20 +536,23 @@ public class Row implements BinaryRow {
         }
 
         if (hasVarTbl) {
-            short verlenItems = readShort(payloadOff);
+            int verlenItems = readAdaptiveSizedItem(payloadOff, adaptiveItemSize);
 
-            payloadOff += varlenItemOffset(verlenItems);
+            payloadOff += varlenItemOffset(verlenItems, adaptiveItemSize);
         }
 
         return payloadOff + colOff;
     }
 
-    /**
-     * @param baseOff Chunk base offset.
-     * @return Null map offset from the row start for the chunk with the given base.
-     */
-    private int nullMapOffset(int baseOff) {
-        return baseOff + CHUNK_LEN_FIELD_SIZE;
+    public int readAdaptiveSizedItem(int off, int size) {
+        switch (size) {
+            case Byte.BYTES:
+                return row.readByte(off);
+            case Short.BYTES:
+                return row.readShort(off);
+            default:
+                return row.readInteger(off);
+        }
     }
 
     /** {@inheritDoc} */
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/RowAssembler.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/RowAssembler.java
index 770ab6d..021d72b 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/RowAssembler.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/RowAssembler.java
@@ -51,6 +51,8 @@ public class RowAssembler {
     /** Target byte buffer to write to. */
     private final ExpandableByteBuf buf;
 
+    private final int valOffSize;
+
     /** Current columns chunk. */
     private Columns curCols;
 
@@ -72,6 +74,8 @@ public class RowAssembler {
     /** Offset of the varlen table for current chunk. */
     private int varlenTblChunkOff;
 
+    private int adaptiveItemSize;
+
     /** Row hashcode. */
     private int keyHash;
 
@@ -155,6 +159,14 @@ public class RowAssembler {
 
         return size + nonNullVarlenSize;
     }
+    public RowAssembler(
+        SchemaDescriptor schema,
+        int size,
+        int nonNullVarlenKeyCols,
+        int nonNullVarlenValCols
+    ) {
+        this(schema, size, 2, nonNullVarlenKeyCols, 2, nonNullVarlenValCols);
+    }
 
     /**
      * @param schema Row schema.
@@ -166,18 +178,21 @@ public class RowAssembler {
     public RowAssembler(
         SchemaDescriptor schema,
         int size,
+        int keyOffSize,
         int nonNullVarlenKeyCols,
+        int valOffSize,
         int nonNullVarlenValCols
     ) {
         this.schema = schema;
         this.nonNullVarlenValCols = nonNullVarlenValCols;
+        this.valOffSize = valOffSize;
 
         curCols = schema.keyColumns();
         flags = 0;
         keyHash = 0;
         strEncoder = null;
 
-        initOffsets(BinaryRow.KEY_CHUNK_OFFSET, nonNullVarlenKeyCols);
+        initOffsets(BinaryRow.KEY_CHUNK_OFFSET, keyOffSize, nonNullVarlenKeyCols);
 
         if (schema.keyColumns().nullMapSize() == 0)
             flags |= RowFlags.OMIT_KEY_NULL_MAP_FLAG;
@@ -185,6 +200,16 @@ public class RowAssembler {
         if (schema.valueColumns().nullMapSize() == 0)
             flags |= RowFlags.OMIT_VAL_NULL_MAP_FLAG;
 
+        if (keyOffSize == 1)
+            flags |= RowFlags.KEY_TYNY_FORMAT;
+        else if (keyOffSize != 2)
+            flags |= RowFlags.KEY_LARGE_ROW_FORMAT;
+
+        if (valOffSize == 1)
+            flags |= RowFlags.VAL_TYNY_FORMAT;
+        else if (valOffSize != 2)
+            flags |= RowFlags.VAL_LARGE_FORMAT;
+
         buf = new ExpandableByteBuf(size);
 
         buf.putShort(0, (short)schema.version());
@@ -192,7 +217,16 @@ public class RowAssembler {
         if (nonNullVarlenKeyCols == 0)
             flags |= OMIT_KEY_VARTBL_FLAG;
         else
-            buf.putShort(varlenTblChunkOff, (short)nonNullVarlenKeyCols);
+            writeAdaptiveItem(varlenTblChunkOff, (short)nonNullVarlenKeyCols);
+    }
+
+    public static int vartableOffSize(int vartableItems, int dataSize) {
+        if (dataSize + vartableItems + 2 < (1 << 8))
+            return 1;
+        if (dataSize + (vartableItems + 2) * 2 < (1 << 16))
+            return 2;
+        else
+            return 4;
     }
 
     /**
@@ -435,7 +469,22 @@ public class RowAssembler {
         assert (flags & (baseOff == BinaryRow.KEY_CHUNK_OFFSET ? OMIT_KEY_VARTBL_FLAG : OMIT_VAL_VARTBL_FLAG)) == 0 :
             "Illegal writing of varlen when 'omit vartable' flag is set for a chunk.";
 
-        buf.putShort(varlenTblChunkOff + Row.varlenItemOffset(tblEntryIdx), (short)off);
+        writeAdaptiveItem(varlenTblChunkOff + vartableItemOffset(tblEntryIdx), off);
+    }
+
+    private void writeAdaptiveItem(int off, int val) {
+        switch (adaptiveItemSize) {
+            case Byte.BYTES:
+                buf.put(off, (byte)val);
+
+                return;
+            case Short.BYTES:
+                buf.putShort(off, (short)val);
+
+                return;
+            default:
+                buf.putInt(off, val);
+        }
     }
 
     /**
@@ -505,19 +554,19 @@ public class RowAssembler {
         if (curCol == curCols.length()) {
             int chunkLen = curOff - baseOff;
 
-            buf.putInt(baseOff, chunkLen);
+            writeAdaptiveItem(baseOff, chunkLen);
 
             if (schema.valueColumns() == curCols)
                 return; // No more columns.
 
             curCols = schema.valueColumns(); // Switch key->value columns.
 
-            initOffsets(baseOff + chunkLen, nonNullVarlenValCols);
+            initOffsets(baseOff + chunkLen, valOffSize, nonNullVarlenValCols);
 
             if (nonNullVarlenValCols == 0)
                 flags |= OMIT_VAL_VARTBL_FLAG;
             else
-                buf.putShort(varlenTblChunkOff, (short)nonNullVarlenValCols);
+                writeAdaptiveItem(varlenTblChunkOff, (short)nonNullVarlenValCols);
         }
     }
 
@@ -525,16 +574,26 @@ public class RowAssembler {
      * @param base Chunk base offset.
      * @param nonNullVarlenCols Number of non-null varlen columns.
      */
-    private void initOffsets(int base, int nonNullVarlenCols) {
+    private void initOffsets(int base, int adaptiveItemSize, int nonNullVarlenCols) {
         baseOff = base;
 
+        this.adaptiveItemSize = adaptiveItemSize;
+
         curCol = 0;
         curVarlenTblEntry = 0;
 
-        nullMapOff = baseOff + BinaryRow.CHUNK_LEN_FIELD_SIZE;
+        nullMapOff = baseOff + adaptiveItemSize /* chunk length. */;
         varlenTblChunkOff = nullMapOff + curCols.nullMapSize();
 
-        curOff = varlenTblChunkOff + varlenTableChunkSize(nonNullVarlenCols);
+        curOff = varlenTblChunkOff + vartableSize(nonNullVarlenCols, adaptiveItemSize);
+    }
+
+    public int vartableItemOffset(int nonNullVarlenCols) {
+        return adaptiveItemSize + nonNullVarlenCols * adaptiveItemSize;
+    }
+
+    public static int vartableSize(int items, int itemSize) {
+        return items == 0 ? 0 : itemSize + items * itemSize;
     }
 
     /**
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/JavaSerializer.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/JavaSerializer.java
index 31bad5e..a36ef94 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/JavaSerializer.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/JavaSerializer.java
@@ -89,7 +89,7 @@ public class JavaSerializer extends AbstractSerializer {
             schema.keyColumns(), keyStat.nonNullCols, keyStat.nonNullColsSize,
             schema.valueColumns(), valStat.nonNullCols, valStat.nonNullColsSize);
 
-        return new RowAssembler(schema, size, keyStat.nonNullCols, valStat.nonNullCols);
+        return new RowAssembler(schema, size, 2, keyStat.nonNullCols, 2, valStat.nonNullCols);
     }
 
     /**
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/TupleMarshallerImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/TupleMarshallerImpl.java
index c944c3c..7a6aed4 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/TupleMarshallerImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/TupleMarshallerImpl.java
@@ -19,8 +19,11 @@ package org.apache.ignite.internal.table;
 
 import java.util.Arrays;
 import java.util.BitSet;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Objects;
 import java.util.UUID;
+import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.ByteBufferRow;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.Columns;
@@ -62,28 +65,72 @@ public class TupleMarshallerImpl implements TupleMarshaller {
 
         validate(keyTuple, schema.keyColumns());
 
-        final RowAssembler rowBuilder = createAssembler(schema, keyTuple, valTuple);
+        ChunkData keyChunk = chunkData(schema.keyColumns(), keyTuple);
+        ChunkData valChunk = chunkData(schema.valueColumns(), valTuple);
+
+        final RowAssembler rowBuilder = createAssembler(schema, keyChunk, valChunk);
 
         for (int i = 0; i < schema.keyColumns().length(); i++) {
             final Column col = schema.keyColumns().column(i);
 
-            writeColumn(keyTuple, col, rowBuilder);
+            writeColumn(rowBuilder, col, keyChunk.data.get(i));
         }
 
-        if (valTuple != null) {
+        if (valChunk.data != null) {
             validate(valTuple, schema.valueColumns());
 
             for (int i = 0; i < schema.valueColumns().length(); i++) {
                 final Column col = schema.valueColumns().column(i);
 
-                writeColumn(valTuple, col, rowBuilder);
+                writeColumn(rowBuilder, col, valChunk.data.get(i));
             }
         }
 
         return new Row(schema, new ByteBufferRow(rowBuilder.build()));
     }
 
-    /** */
+    private ChunkData chunkData(Columns cols, Tuple tuple) {
+        if (tuple == null)
+            return new ChunkData();
+
+        ChunkData chunk = new ChunkData();
+
+        chunk.data = new HashMap<>();
+
+        for (int i = 0; i < cols.length(); i++) {
+            Column col = cols.column(i);
+
+            Object val = (tuple.contains(col.name())) ? tuple.value(col.name()) : col.defaultValue();
+
+            if (val == null)
+                chunk.hasNulls = true;
+            else {
+                chunk.data.put(i, val);
+
+                if (col.type().spec().fixedLength())
+                    chunk.dataSize += col.type().sizeInBytes();
+                else {
+                    chunk.nonNullVarlen++;
+
+                    chunk.dataSize += getValueSize(val, col.type());
+                }
+            }
+        }
+
+        return chunk;
+    }
+
+    class ChunkData {
+        public boolean hasNulls;
+        public int dataSize;
+        public int nonNullVarlen;
+        Map<Integer, Object> data;
+
+    }
+
+    /**
+     *
+     */
     private void validate(Tuple tuple, Columns columns) {
         if (tuple instanceof TupleBuilderImpl) {
             TupleBuilderImpl t0 = (TupleBuilderImpl)tuple;
@@ -105,31 +152,27 @@ public class TupleMarshallerImpl implements TupleMarshaller {
      * @param valTuple Value tuple.
      * @return Row assembler.
      */
-    private RowAssembler createAssembler(SchemaDescriptor schema, Tuple keyTuple, Tuple valTuple) {
-        final ObjectStatistic keyStat = collectObjectStats(schema.keyColumns(), keyTuple);
-        final ObjectStatistic valStat = collectObjectStats(schema.valueColumns(), valTuple);
-
-        int size = RowAssembler.rowSize(
-            schema.keyColumns(), keyStat.nonNullCols, keyStat.nonNullColsSize,
-            schema.valueColumns(), valStat.nonNullCols, valStat.nonNullColsSize);
-
-        return new RowAssembler(schema, size, keyStat.nonNullCols, valStat.nonNullCols);
+    private RowAssembler createAssembler(SchemaDescriptor schema, ChunkData keyChunk, ChunkData valChunk) {
+        final int keyOffSize = RowAssembler.vartableOffSize(keyChunk.nonNullVarlen, keyChunk.dataSize);
+        final int valOffSize = RowAssembler.vartableOffSize(valChunk.nonNullVarlen, valChunk.dataSize);
+
+        int size = BinaryRow.HEADER_SIZE +
+            (keyChunk.hasNulls ? schema.keyColumns().nullMapSize() : 0) +
+            RowAssembler.vartableSize(valChunk.nonNullVarlen, valOffSize) +
+            keyChunk.dataSize +
+            (valChunk.hasNulls ? schema.valueColumns().nullMapSize() : 0) +
+            RowAssembler.vartableSize(valChunk.nonNullVarlen, valOffSize) +
+            valChunk.dataSize;
+
+        return new RowAssembler(schema, size, keyOffSize, keyChunk.nonNullVarlen, valOffSize, valChunk.nonNullVarlen);
     }
 
     /**
-     * @param tup Tuple.
-     * @param col Column.
      * @param rowAsm Row assembler.
+     * @param col Column.
+     * @param val Value.
      */
-    private void writeColumn(Tuple tup, Column col, RowAssembler rowAsm) {
-        Object val;
-
-        if (!tup.contains(col.name()))
-            val = col.defaultValue();
-        else {
-            val = tup.value(col.name());
-        }
-
+    private void writeColumn(RowAssembler rowAsm, Column col, Object val) {
         if (val == null) {
             rowAsm.appendNull();
 
@@ -191,50 +234,4 @@ public class TupleMarshallerImpl implements TupleMarshaller {
                 throw new IllegalStateException("Unexpected value: " + col.type());
         }
     }
-
-    /**
-     * Reads object fields and gather statistic.
-     *
-     * @param cols Schema columns.
-     * @param tup Tuple.
-     * @return Object statistic.
-     */
-    private ObjectStatistic collectObjectStats(Columns cols, Tuple tup) {
-        if (tup == null || !cols.hasVarlengthColumns())
-            return new ObjectStatistic(0, 0);
-
-        int cnt = 0;
-        int size = 0;
-
-        for (int i = cols.firstVarlengthColumn(); i < cols.length(); i++) {
-            Column col = cols.column(i);
-
-            final Object val = tup.contains(col.name()) ? tup.value(col.name()) : col.defaultValue();
-
-            if (val == null || cols.column(i).type().spec().fixedLength())
-                continue;
-
-            size += getValueSize(val, cols.column(i).type());
-            cnt++;
-        }
-
-        return new ObjectStatistic(cnt, size);
-    }
-
-    /**
-     * Object statistic.
-     */
-    private static class ObjectStatistic {
-        /** Non-null fields of varlen type. */
-        int nonNullCols;
-
-        /** Length of all non-null fields of varlen types. */
-        int nonNullColsSize;
-
-        /** Constructor. */
-        ObjectStatistic(int nonNullCols, int nonNullColsSize) {
-            this.nonNullCols = nonNullCols;
-            this.nonNullColsSize = nonNullColsSize;
-        }
-    }
 }