You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2021/06/09 19:36:13 UTC
[ignite-3] 01/15: Add tiny,normal,large chunk formats.
This is an automated email from the ASF dual-hosted git repository.
amashenkov pushed a commit to branch ignite-14743
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit b0bc9e531233e5eb811227c1731b5f2e523b4204
Author: Andrew Mashenkov <an...@gmail.com>
AuthorDate: Fri May 28 21:53:05 2021 +0300
Add tiny,normal,large chunk formats.
---
.../apache/ignite/internal/schema/BinaryRow.java | 17 +++
.../ignite/internal/schema/ByteBufferRow.java | 16 ++-
.../org/apache/ignite/internal/schema/Column.java | 2 +
.../org/apache/ignite/internal/schema/Row.java | 79 ++++++------
.../ignite/internal/schema/RowAssembler.java | 77 ++++++++++--
.../marshaller/reflection/JavaSerializer.java | 2 +-
.../ignite/internal/table/TupleMarshallerImpl.java | 139 ++++++++++-----------
7 files changed, 213 insertions(+), 119 deletions(-)
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRow.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRow.java
index 8088491..74be9fb 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRow.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRow.java
@@ -28,16 +28,25 @@ import java.nio.ByteBuffer;
public interface BinaryRow {
/** */
int SCHEMA_VERSION_OFFSET = 0;
+
/** */
int FLAGS_FIELD_OFFSET = SCHEMA_VERSION_OFFSET + 2 /* version length */;
+
/** */
int KEY_HASH_FIELD_OFFSET = FLAGS_FIELD_OFFSET + 2 /* flags length */;
+
/** */
int KEY_CHUNK_OFFSET = KEY_HASH_FIELD_OFFSET + 4 /* hash length */;
+
+ /** */
+ int HEADER_SIZE = KEY_CHUNK_OFFSET;
+
/** */
int CHUNK_LEN_FIELD_SIZE = 4;
+
/** */
int VARLEN_TABLE_SIZE_FIELD_SIZE = 2;
+
/** */
int VARLEN_COLUMN_OFFSET_FIELD_SIZE = 2;
@@ -148,6 +157,14 @@ public interface BinaryRow {
/** Flag indicates value chunk omits varlen table. */
public static final int OMIT_VAL_VARTBL_FLAG = 1 << 4;
+ public static final int KEY_TYNY_FORMAT = 1 << 5;
+
+ public static final int KEY_LARGE_ROW_FORMAT = 1 << 6;
+
+ public static final int VAL_TYNY_FORMAT = 1 << 7;
+
+ public static final int VAL_LARGE_FORMAT = 1 << 8;
+
/** Stub. */
private RowFlags() {
// No-op.
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java
index 6ae8889..9c9c81e 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java
@@ -129,8 +129,12 @@ public class ByteBufferRow implements BinaryRow {
/** {@inheritDoc} */
@Override public ByteBuffer keySlice() {
+ final short flags = readShort(FLAGS_FIELD_OFFSET);
+
final int off = KEY_CHUNK_OFFSET;
- final int len = readInteger(off);
+ final int len = (flags & RowFlags.KEY_LARGE_ROW_FORMAT) != 0 ? readInteger(off) :
+ (flags & RowFlags.KEY_TYNY_FORMAT) != 0 ? readByte(off) :
+ readShort(off);
try {
return buf.limit(off + len).position(off).slice();
@@ -143,8 +147,14 @@ public class ByteBufferRow implements BinaryRow {
/** {@inheritDoc} */
@Override public ByteBuffer valueSlice() {
- int off = KEY_CHUNK_OFFSET + readInteger(KEY_CHUNK_OFFSET);
- int len = hasValue() ? readInteger(off) : 0;
+ final short flags = readShort(FLAGS_FIELD_OFFSET);
+
+ int off = KEY_CHUNK_OFFSET +
+ ((flags & RowFlags.KEY_LARGE_ROW_FORMAT) != 0 ? readInteger(KEY_CHUNK_OFFSET) :
+ (flags & RowFlags.KEY_TYNY_FORMAT) != 0 ? readByte(KEY_CHUNK_OFFSET) : readShort(KEY_CHUNK_OFFSET));
+
+ int len = hasValue() ? (flags & RowFlags.VAL_LARGE_FORMAT) != 0 ? readInteger(off) :
+ (flags & RowFlags.VAL_TYNY_FORMAT) != 0 ? readByte(off) : readShort(off) : 0;
try {
return buf.limit(off + len).position(off).slice();
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/Column.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/Column.java
index 815f469..ecc0a85 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/Column.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/Column.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.schema;
import java.io.Serializable;
import java.util.function.Supplier;
+import org.apache.ignite.internal.tostring.IgniteToStringExclude;
import org.apache.ignite.internal.tostring.S;
import org.jetbrains.annotations.NotNull;
@@ -50,6 +51,7 @@ public class Column implements Comparable<Column>, Serializable {
/**
* Default value supplier.
*/
+ @IgniteToStringExclude
private final Supplier<Object> defValSup;
/**
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/Row.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/Row.java
index a2fa63e..4ed78b0 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/Row.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/Row.java
@@ -39,14 +39,6 @@ public class Row implements BinaryRow {
private final BinaryRow row;
/**
- * @param itemIdx Varlen table item index.
- * @return Varlen item offset.
- */
- public static int varlenItemOffset(int itemIdx) {
- return VARLEN_TABLE_SIZE_FIELD_SIZE + itemIdx * VARLEN_COLUMN_OFFSET_FIELD_SIZE;
- }
-
- /**
* Constructor.
*
* @param schema Schema.
@@ -60,6 +52,14 @@ public class Row implements BinaryRow {
}
/**
+ * @param itemIdx Varlen table item index.
+ * @return Varlen item offset.
+ */
+ private int varlenItemOffset(int itemIdx, int itemSize) {
+ return itemSize + itemIdx * itemSize;
+ }
+
+ /**
* @return Row schema.
*/
public SchemaDescriptor rowSchema() {
@@ -347,13 +347,17 @@ public class Row implements BinaryRow {
final short flags = readShort(FLAGS_FIELD_OFFSET);
+ int size = keyCol ?
+ ((flags & RowFlags.KEY_TYNY_FORMAT) != 0) ? 1 : (flags & RowFlags.KEY_LARGE_ROW_FORMAT) != 0 ? 4 : 2 :
+ (flags & RowFlags.VAL_TYNY_FORMAT) != 0 ? 1 : (flags & RowFlags.VAL_LARGE_FORMAT) != 0 ? 4 : 2;
+
int off = KEY_CHUNK_OFFSET;
if (!keyCol) {
assert (flags & RowFlags.NO_VALUE_FLAG) == 0;
// Jump to the next chunk, the size of the first chunk is written at the chunk start.
- off += readInteger(off);
+ off += readAdaptiveSizedItem(off, size);
// Adjust the column index according to the number of key columns.
colIdx -= schema.keyColumns().length();
@@ -368,14 +372,14 @@ public class Row implements BinaryRow {
boolean hasVarTable = ((keyCol ? RowFlags.OMIT_KEY_VARTBL_FLAG : RowFlags.OMIT_VAL_VARTBL_FLAG) & flags) == 0;
boolean hasNullMap = ((keyCol ? RowFlags.OMIT_KEY_NULL_MAP_FLAG : RowFlags.OMIT_VAL_NULL_MAP_FLAG) & flags) == 0;
- if (hasNullMap && isNull(off, colIdx))
+ if (hasNullMap && isNull(off + size, colIdx))
return -1;
assert hasVarTable || type.fixedLength();
return type.fixedLength() ?
- fixlenColumnOffset(cols, off, colIdx, hasVarTable, hasNullMap) :
- varlenColumnOffsetAndLength(cols, off, colIdx, hasNullMap);
+ fixlenColumnOffset(cols, off, colIdx, hasVarTable, hasNullMap, size) :
+ varlenColumnOffsetAndLength(cols, off, colIdx, hasNullMap, size);
}
/**
@@ -391,13 +395,11 @@ public class Row implements BinaryRow {
/**
* Checks the row's null map for the given column index in the chunk.
*
- * @param baseOff Offset of the chunk start in the row.
+ * @param nullMapOff Null map offset
* @param idx Offset of the column in the chunk.
* @return {@code true} if the column value is {@code null}.
*/
- private boolean isNull(int baseOff, int idx) {
- int nullMapOff = nullMapOffset(baseOff);
-
+ private boolean isNull(int nullMapOff, int idx) {
int nullByte = idx / 8;
int posInByte = idx % 8;
@@ -439,17 +441,19 @@ public class Row implements BinaryRow {
* @param baseOff Chunk base offset.
* @param idx Column index in the chunk.
* @param hasNullMap Has null map flag.
+ * @param adaptiveItemSize
* @return Encoded offset (from the row start) and length of the column with the given index.
*/
- private long varlenColumnOffsetAndLength(Columns cols, int baseOff, int idx, boolean hasNullMap) {
- int vartableOff = baseOff + CHUNK_LEN_FIELD_SIZE;
+ private long varlenColumnOffsetAndLength(Columns cols, int baseOff, int idx, boolean hasNullMap,
+ int adaptiveItemSize) {
+ int vartableOff = baseOff + adaptiveItemSize /* Chunk len field size. */;
int numNullsBefore = 0;
if (hasNullMap) {
vartableOff += cols.nullMapSize();
- int nullMapOff = nullMapOffset(baseOff);
+ int nullMapOff = baseOff + adaptiveItemSize;
int nullStartByte = cols.firstVarlengthColumn() / 8;
int startBitInByte = cols.firstVarlengthColumn() % 8;
@@ -473,16 +477,16 @@ public class Row implements BinaryRow {
}
idx -= cols.numberOfFixsizeColumns() + numNullsBefore;
- int vartableSize = readShort(vartableOff);
+ int vartableSize = readAdaptiveSizedItem(vartableOff, adaptiveItemSize);
// Offset of idx-th column is from base offset.
- int resOff = readShort(vartableOff + varlenItemOffset(idx));
+ int resOff = readAdaptiveSizedItem(vartableOff + varlenItemOffset(idx, adaptiveItemSize), adaptiveItemSize);
long len = (idx == vartableSize - 1) ?
// totalLength - columnStartOffset
- readInteger(baseOff) - resOff :
+ readAdaptiveSizedItem(baseOff, adaptiveItemSize) - resOff :
// nextColumnStartOffset - columnStartOffset
- readShort(vartableOff + varlenItemOffset(idx + 1)) - resOff;
+ readAdaptiveSizedItem(vartableOff + varlenItemOffset(idx + 1, adaptiveItemSize), adaptiveItemSize) - resOff;
return (len << 32) | (resOff + baseOff);
}
@@ -496,12 +500,14 @@ public class Row implements BinaryRow {
* @param idx Column index in the chunk.
* @param hasVarTbl Has varlen table flag.
* @param hasNullMap Has null map flag.
+ * @param adaptiveItemSize
* @return Encoded offset (from the row start) of the requested fixlen column.
*/
- int fixlenColumnOffset(Columns cols, int baseOff, int idx, boolean hasVarTbl, boolean hasNullMap) {
+ int fixlenColumnOffset(Columns cols, int baseOff, int idx, boolean hasVarTbl, boolean hasNullMap,
+ int adaptiveItemSize) {
int colOff = 0;
- int payloadOff = baseOff + CHUNK_LEN_FIELD_SIZE;
+ int payloadOff = baseOff + adaptiveItemSize /* Chunk len field size. */;
// Calculate fixlen column offset.
{
@@ -517,9 +523,9 @@ public class Row implements BinaryRow {
// Fold offset based on the whole map bytes in the schema
for (int i = 0; i < colByteIdx; i++)
- colOff += cols.foldFixedLength(i, readByte(nullMapOffset(baseOff) + i));
+ colOff += cols.foldFixedLength(i, readByte(baseOff + adaptiveItemSize + i));
- colOff += cols.foldFixedLength(colByteIdx, readByte(nullMapOffset(baseOff) + colByteIdx) | mask);
+ colOff += cols.foldFixedLength(colByteIdx, readByte(baseOff + adaptiveItemSize + colByteIdx) | mask);
}
else {
for (int i = 0; i < colByteIdx; i++)
@@ -530,20 +536,23 @@ public class Row implements BinaryRow {
}
if (hasVarTbl) {
- short verlenItems = readShort(payloadOff);
+ int verlenItems = readAdaptiveSizedItem(payloadOff, adaptiveItemSize);
- payloadOff += varlenItemOffset(verlenItems);
+ payloadOff += varlenItemOffset(verlenItems, adaptiveItemSize);
}
return payloadOff + colOff;
}
- /**
- * @param baseOff Chunk base offset.
- * @return Null map offset from the row start for the chunk with the given base.
- */
- private int nullMapOffset(int baseOff) {
- return baseOff + CHUNK_LEN_FIELD_SIZE;
+ public int readAdaptiveSizedItem(int off, int size) {
+ switch (size) {
+ case Byte.BYTES:
+ return row.readByte(off);
+ case Short.BYTES:
+ return row.readShort(off);
+ default:
+ return row.readInteger(off);
+ }
}
/** {@inheritDoc} */
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/RowAssembler.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/RowAssembler.java
index 770ab6d..021d72b 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/RowAssembler.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/RowAssembler.java
@@ -51,6 +51,8 @@ public class RowAssembler {
/** Target byte buffer to write to. */
private final ExpandableByteBuf buf;
+ private final int valOffSize;
+
/** Current columns chunk. */
private Columns curCols;
@@ -72,6 +74,8 @@ public class RowAssembler {
/** Offset of the varlen table for current chunk. */
private int varlenTblChunkOff;
+ private int adaptiveItemSize;
+
/** Row hashcode. */
private int keyHash;
@@ -155,6 +159,14 @@ public class RowAssembler {
return size + nonNullVarlenSize;
}
+ public RowAssembler(
+ SchemaDescriptor schema,
+ int size,
+ int nonNullVarlenKeyCols,
+ int nonNullVarlenValCols
+ ) {
+ this(schema, size, 2, nonNullVarlenKeyCols, 2, nonNullVarlenValCols);
+ }
/**
* @param schema Row schema.
@@ -166,18 +178,21 @@ public class RowAssembler {
public RowAssembler(
SchemaDescriptor schema,
int size,
+ int keyOffSize,
int nonNullVarlenKeyCols,
+ int valOffSize,
int nonNullVarlenValCols
) {
this.schema = schema;
this.nonNullVarlenValCols = nonNullVarlenValCols;
+ this.valOffSize = valOffSize;
curCols = schema.keyColumns();
flags = 0;
keyHash = 0;
strEncoder = null;
- initOffsets(BinaryRow.KEY_CHUNK_OFFSET, nonNullVarlenKeyCols);
+ initOffsets(BinaryRow.KEY_CHUNK_OFFSET, keyOffSize, nonNullVarlenKeyCols);
if (schema.keyColumns().nullMapSize() == 0)
flags |= RowFlags.OMIT_KEY_NULL_MAP_FLAG;
@@ -185,6 +200,16 @@ public class RowAssembler {
if (schema.valueColumns().nullMapSize() == 0)
flags |= RowFlags.OMIT_VAL_NULL_MAP_FLAG;
+ if (keyOffSize == 1)
+ flags |= RowFlags.KEY_TYNY_FORMAT;
+ else if (keyOffSize != 2)
+ flags |= RowFlags.KEY_LARGE_ROW_FORMAT;
+
+ if (valOffSize == 1)
+ flags |= RowFlags.VAL_TYNY_FORMAT;
+ else if (valOffSize != 2)
+ flags |= RowFlags.VAL_LARGE_FORMAT;
+
buf = new ExpandableByteBuf(size);
buf.putShort(0, (short)schema.version());
@@ -192,7 +217,16 @@ public class RowAssembler {
if (nonNullVarlenKeyCols == 0)
flags |= OMIT_KEY_VARTBL_FLAG;
else
- buf.putShort(varlenTblChunkOff, (short)nonNullVarlenKeyCols);
+ writeAdaptiveItem(varlenTblChunkOff, (short)nonNullVarlenKeyCols);
+ }
+
+ public static int vartableOffSize(int vartableItems, int dataSize) {
+ if (dataSize + vartableItems + 2 < (1 << 8))
+ return 1;
+ if (dataSize + (vartableItems + 2) * 2 < (1 << 16))
+ return 2;
+ else
+ return 4;
}
/**
@@ -435,7 +469,22 @@ public class RowAssembler {
assert (flags & (baseOff == BinaryRow.KEY_CHUNK_OFFSET ? OMIT_KEY_VARTBL_FLAG : OMIT_VAL_VARTBL_FLAG)) == 0 :
"Illegal writing of varlen when 'omit vartable' flag is set for a chunk.";
- buf.putShort(varlenTblChunkOff + Row.varlenItemOffset(tblEntryIdx), (short)off);
+ writeAdaptiveItem(varlenTblChunkOff + vartableItemOffset(tblEntryIdx), off);
+ }
+
+ private void writeAdaptiveItem(int off, int val) {
+ switch (adaptiveItemSize) {
+ case Byte.BYTES:
+ buf.put(off, (byte)val);
+
+ return;
+ case Short.BYTES:
+ buf.putShort(off, (short)val);
+
+ return;
+ default:
+ buf.putInt(off, val);
+ }
}
/**
@@ -505,19 +554,19 @@ public class RowAssembler {
if (curCol == curCols.length()) {
int chunkLen = curOff - baseOff;
- buf.putInt(baseOff, chunkLen);
+ writeAdaptiveItem(baseOff, chunkLen);
if (schema.valueColumns() == curCols)
return; // No more columns.
curCols = schema.valueColumns(); // Switch key->value columns.
- initOffsets(baseOff + chunkLen, nonNullVarlenValCols);
+ initOffsets(baseOff + chunkLen, valOffSize, nonNullVarlenValCols);
if (nonNullVarlenValCols == 0)
flags |= OMIT_VAL_VARTBL_FLAG;
else
- buf.putShort(varlenTblChunkOff, (short)nonNullVarlenValCols);
+ writeAdaptiveItem(varlenTblChunkOff, (short)nonNullVarlenValCols);
}
}
@@ -525,16 +574,26 @@ public class RowAssembler {
* @param base Chunk base offset.
* @param nonNullVarlenCols Number of non-null varlen columns.
*/
- private void initOffsets(int base, int nonNullVarlenCols) {
+ private void initOffsets(int base, int adaptiveItemSize, int nonNullVarlenCols) {
baseOff = base;
+ this.adaptiveItemSize = adaptiveItemSize;
+
curCol = 0;
curVarlenTblEntry = 0;
- nullMapOff = baseOff + BinaryRow.CHUNK_LEN_FIELD_SIZE;
+ nullMapOff = baseOff + adaptiveItemSize /* chunk length. */;
varlenTblChunkOff = nullMapOff + curCols.nullMapSize();
- curOff = varlenTblChunkOff + varlenTableChunkSize(nonNullVarlenCols);
+ curOff = varlenTblChunkOff + vartableSize(nonNullVarlenCols, adaptiveItemSize);
+ }
+
+ public int vartableItemOffset(int nonNullVarlenCols) {
+ return adaptiveItemSize + nonNullVarlenCols * adaptiveItemSize;
+ }
+
+ public static int vartableSize(int items, int itemSize) {
+ return items == 0 ? 0 : itemSize + items * itemSize;
}
/**
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/JavaSerializer.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/JavaSerializer.java
index 31bad5e..a36ef94 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/JavaSerializer.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/JavaSerializer.java
@@ -89,7 +89,7 @@ public class JavaSerializer extends AbstractSerializer {
schema.keyColumns(), keyStat.nonNullCols, keyStat.nonNullColsSize,
schema.valueColumns(), valStat.nonNullCols, valStat.nonNullColsSize);
- return new RowAssembler(schema, size, keyStat.nonNullCols, valStat.nonNullCols);
+ return new RowAssembler(schema, size, 2, keyStat.nonNullCols, 2, valStat.nonNullCols);
}
/**
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/TupleMarshallerImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/TupleMarshallerImpl.java
index c944c3c..7a6aed4 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/TupleMarshallerImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/TupleMarshallerImpl.java
@@ -19,8 +19,11 @@ package org.apache.ignite.internal.table;
import java.util.Arrays;
import java.util.BitSet;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Objects;
import java.util.UUID;
+import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.ByteBufferRow;
import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.Columns;
@@ -62,28 +65,72 @@ public class TupleMarshallerImpl implements TupleMarshaller {
validate(keyTuple, schema.keyColumns());
- final RowAssembler rowBuilder = createAssembler(schema, keyTuple, valTuple);
+ ChunkData keyChunk = chunkData(schema.keyColumns(), keyTuple);
+ ChunkData valChunk = chunkData(schema.valueColumns(), valTuple);
+
+ final RowAssembler rowBuilder = createAssembler(schema, keyChunk, valChunk);
for (int i = 0; i < schema.keyColumns().length(); i++) {
final Column col = schema.keyColumns().column(i);
- writeColumn(keyTuple, col, rowBuilder);
+ writeColumn(rowBuilder, col, keyChunk.data.get(i));
}
- if (valTuple != null) {
+ if (valChunk.data != null) {
validate(valTuple, schema.valueColumns());
for (int i = 0; i < schema.valueColumns().length(); i++) {
final Column col = schema.valueColumns().column(i);
- writeColumn(valTuple, col, rowBuilder);
+ writeColumn(rowBuilder, col, valChunk.data.get(i));
}
}
return new Row(schema, new ByteBufferRow(rowBuilder.build()));
}
- /** */
+ private ChunkData chunkData(Columns cols, Tuple tuple) {
+ if (tuple == null)
+ return new ChunkData();
+
+ ChunkData chunk = new ChunkData();
+
+ chunk.data = new HashMap<>();
+
+ for (int i = 0; i < cols.length(); i++) {
+ Column col = cols.column(i);
+
+ Object val = (tuple.contains(col.name())) ? tuple.value(col.name()) : col.defaultValue();
+
+ if (val == null)
+ chunk.hasNulls = true;
+ else {
+ chunk.data.put(i, val);
+
+ if (col.type().spec().fixedLength())
+ chunk.dataSize += col.type().sizeInBytes();
+ else {
+ chunk.nonNullVarlen++;
+
+ chunk.dataSize += getValueSize(val, col.type());
+ }
+ }
+ }
+
+ return chunk;
+ }
+
+ class ChunkData {
+ public boolean hasNulls;
+ public int dataSize;
+ public int nonNullVarlen;
+ Map<Integer, Object> data;
+
+ }
+
+ /**
+ *
+ */
private void validate(Tuple tuple, Columns columns) {
if (tuple instanceof TupleBuilderImpl) {
TupleBuilderImpl t0 = (TupleBuilderImpl)tuple;
@@ -105,31 +152,27 @@ public class TupleMarshallerImpl implements TupleMarshaller {
* @param valTuple Value tuple.
* @return Row assembler.
*/
- private RowAssembler createAssembler(SchemaDescriptor schema, Tuple keyTuple, Tuple valTuple) {
- final ObjectStatistic keyStat = collectObjectStats(schema.keyColumns(), keyTuple);
- final ObjectStatistic valStat = collectObjectStats(schema.valueColumns(), valTuple);
-
- int size = RowAssembler.rowSize(
- schema.keyColumns(), keyStat.nonNullCols, keyStat.nonNullColsSize,
- schema.valueColumns(), valStat.nonNullCols, valStat.nonNullColsSize);
-
- return new RowAssembler(schema, size, keyStat.nonNullCols, valStat.nonNullCols);
+ private RowAssembler createAssembler(SchemaDescriptor schema, ChunkData keyChunk, ChunkData valChunk) {
+ final int keyOffSize = RowAssembler.vartableOffSize(keyChunk.nonNullVarlen, keyChunk.dataSize);
+ final int valOffSize = RowAssembler.vartableOffSize(valChunk.nonNullVarlen, valChunk.dataSize);
+
+ int size = BinaryRow.HEADER_SIZE +
+ (keyChunk.hasNulls ? schema.keyColumns().nullMapSize() : 0) +
+ RowAssembler.vartableSize(valChunk.nonNullVarlen, valOffSize) +
+ keyChunk.dataSize +
+ (valChunk.hasNulls ? schema.valueColumns().nullMapSize() : 0) +
+ RowAssembler.vartableSize(valChunk.nonNullVarlen, valOffSize) +
+ valChunk.dataSize;
+
+ return new RowAssembler(schema, size, keyOffSize, keyChunk.nonNullVarlen, valOffSize, valChunk.nonNullVarlen);
}
/**
- * @param tup Tuple.
- * @param col Column.
* @param rowAsm Row assembler.
+ * @param col Column.
+ * @param val Value.
*/
- private void writeColumn(Tuple tup, Column col, RowAssembler rowAsm) {
- Object val;
-
- if (!tup.contains(col.name()))
- val = col.defaultValue();
- else {
- val = tup.value(col.name());
- }
-
+ private void writeColumn(RowAssembler rowAsm, Column col, Object val) {
if (val == null) {
rowAsm.appendNull();
@@ -191,50 +234,4 @@ public class TupleMarshallerImpl implements TupleMarshaller {
throw new IllegalStateException("Unexpected value: " + col.type());
}
}
-
- /**
- * Reads object fields and gather statistic.
- *
- * @param cols Schema columns.
- * @param tup Tuple.
- * @return Object statistic.
- */
- private ObjectStatistic collectObjectStats(Columns cols, Tuple tup) {
- if (tup == null || !cols.hasVarlengthColumns())
- return new ObjectStatistic(0, 0);
-
- int cnt = 0;
- int size = 0;
-
- for (int i = cols.firstVarlengthColumn(); i < cols.length(); i++) {
- Column col = cols.column(i);
-
- final Object val = tup.contains(col.name()) ? tup.value(col.name()) : col.defaultValue();
-
- if (val == null || cols.column(i).type().spec().fixedLength())
- continue;
-
- size += getValueSize(val, cols.column(i).type());
- cnt++;
- }
-
- return new ObjectStatistic(cnt, size);
- }
-
- /**
- * Object statistic.
- */
- private static class ObjectStatistic {
- /** Non-null fields of varlen type. */
- int nonNullCols;
-
- /** Length of all non-null fields of varlen types. */
- int nonNullColsSize;
-
- /** Constructor. */
- ObjectStatistic(int nonNullCols, int nonNullColsSize) {
- this.nonNullCols = nonNullCols;
- this.nonNullColsSize = nonNullColsSize;
- }
- }
}