You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2022/06/17 00:51:51 UTC
[pinot] branch master updated: DataTable V4 implementation with per-column null bitmaps (#8872)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 7024a6d5e3 DataTable V4 implementation with per-column null bitmaps (#8872)
7024a6d5e3 is described below
commit 7024a6d5e3be3a33becc0ffa3319b3f1102d884c
Author: nizarhejazi <96...@users.noreply.github.com>
AuthorDate: Thu Jun 16 17:51:46 2022 -0700
DataTable V4 implementation with per-column null bitmaps (#8872)
---
.../org/apache/pinot/common/utils/DataTable.java | 3 +
.../apache/pinot/core/common/NullBitmapUtils.java | 49 ++++++++++++++
.../apache/pinot/core/common/ObjectSerDeUtils.java | 10 ++-
.../pinot/core/common/datablock/BaseDataBlock.java | 10 +++
.../core/common/datablock/DataBlockBuilder.java | 33 +++++++--
.../pinot/core/common/datablock/MetadataBlock.java | 2 +-
.../pinot/core/common/datablock/RowDataBlock.java | 25 +++++++
.../pinot/core/common/datatable/BaseDataTable.java | 10 +++
.../core/common/datatable/DataTableBuilder.java | 26 +++++--
.../pinot/core/common/datablock/DataBlockTest.java | 2 +-
.../core/common/datatable/DataTableSerDeTest.java | 79 +++++++++++++++-------
.../query/runtime/operator/HashJoinOperator.java | 2 +-
.../runtime/operator/MailboxSendOperator.java | 2 +-
13 files changed, 210 insertions(+), 43 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java
index 6157b5cac9..c01af3ab11 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java
@@ -26,6 +26,7 @@ import java.util.Map;
import javax.annotation.Nullable;
import org.apache.pinot.common.response.ProcessingException;
import org.apache.pinot.spi.utils.ByteArray;
+import org.roaringbitmap.RoaringBitmap;
/**
@@ -76,6 +77,8 @@ public interface DataTable {
String[] getStringArray(int rowId, int colId);
+ RoaringBitmap getNullRowIds(int colId);
+
DataTable toMetadataOnlyDataTable();
DataTable toDataOnlyDataTable();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/NullBitmapUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/common/NullBitmapUtils.java
new file mode 100644
index 0000000000..f157d3274e
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/NullBitmapUtils.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.common;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import org.roaringbitmap.RoaringBitmap;
+
+
+public final class NullBitmapUtils {
+ private NullBitmapUtils() {
+ }
+
+ public static void setNullRowIds(RoaringBitmap nullBitmap, ByteArrayOutputStream fixedSizeByteArrayOutputStream,
+ ByteArrayOutputStream variableSizeDataByteArrayOutputStream)
+ throws IOException {
+ writeInt(fixedSizeByteArrayOutputStream, variableSizeDataByteArrayOutputStream.size());
+ if (nullBitmap == null || nullBitmap.isEmpty()) {
+ writeInt(fixedSizeByteArrayOutputStream, 0);
+ } else {
+ byte[] nullBitmapBytes = ObjectSerDeUtils.ROARING_BITMAP_SER_DE.serialize(nullBitmap);
+ writeInt(fixedSizeByteArrayOutputStream, nullBitmapBytes.length);
+ variableSizeDataByteArrayOutputStream.write(nullBitmapBytes);
+ }
+ }
+
+ private static void writeInt(ByteArrayOutputStream out, int value) {
+ out.write((value >>> 24) & 0xFF);
+ out.write((value >>> 16) & 0xFF);
+ out.write((value >>> 8) & 0xFF);
+ out.write(value & 0xFF);
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
index e6ed491fc8..0cd922eb6c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
@@ -56,6 +56,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import javax.annotation.Nullable;
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.theta.Sketch;
import org.apache.pinot.core.query.distinct.DistinctTable;
@@ -119,7 +120,8 @@ public class ObjectSerDeUtils {
LongLongPair(28),
FloatLongPair(29),
DoubleLongPair(30),
- StringLongPair(31);
+ StringLongPair(31),
+ Null(100);
private final int _value;
ObjectType(int value) {
@@ -130,7 +132,11 @@ public class ObjectSerDeUtils {
return _value;
}
- public static ObjectType getObjectType(Object value) {
+ public static ObjectType getObjectType(@Nullable Object value) {
+ if (value == null) {
+ return ObjectType.Null;
+ }
+
if (value instanceof String) {
return ObjectType.String;
} else if (value instanceof Long) {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/BaseDataBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/BaseDataBlock.java
index 607fdef8bd..04a0ad1ded 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/BaseDataBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/BaseDataBlock.java
@@ -35,6 +35,7 @@ import org.apache.pinot.core.common.datatable.DataTableUtils;
import org.apache.pinot.core.query.request.context.ThreadTimer;
import org.apache.pinot.spi.utils.BigDecimalUtils;
import org.apache.pinot.spi.utils.ByteArray;
+import org.roaringbitmap.RoaringBitmap;
import static java.nio.charset.StandardCharsets.UTF_8;
@@ -237,6 +238,11 @@ public abstract class BaseDataBlock implements DataTable {
return _numRows;
}
+ @Override
+ public RoaringBitmap getNullRowIds(int colId) {
+ return null;
+ }
+
// --------------------------------------------------------------------------
// Fixed sized element access.
// --------------------------------------------------------------------------
@@ -296,6 +302,10 @@ public abstract class BaseDataBlock implements DataTable {
public <T> T getObject(int rowId, int colId) {
int size = positionCursorInVariableBuffer(rowId, colId);
int objectTypeValue = _variableSizeData.getInt();
+ if (size == 0) {
+ assert objectTypeValue == ObjectSerDeUtils.ObjectType.Null.getValue();
+ return null;
+ }
ByteBuffer byteBuffer = _variableSizeData.slice();
byteBuffer.limit(size);
return ObjectSerDeUtils.deserialize(byteBuffer, objectTypeValue);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
index ff6cc4e43c..8a14b491e9 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
@@ -26,11 +26,14 @@ import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import javax.annotation.Nullable;
import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.NullBitmapUtils;
import org.apache.pinot.core.common.ObjectSerDeUtils;
import org.apache.pinot.spi.utils.ArrayCopyUtils;
import org.apache.pinot.spi.utils.BigDecimalUtils;
import org.apache.pinot.spi.utils.ByteArray;
+import org.roaringbitmap.RoaringBitmap;
public class DataBlockBuilder {
@@ -54,8 +57,6 @@ public class DataBlockBuilder {
new DataOutputStream(_variableSizeDataByteArrayOutputStream);
- private ByteBuffer _currentRowDataByteBuffer;
-
private DataBlockBuilder(DataSchema dataSchema, BaseDataBlock.Type blockType) {
_dataSchema = dataSchema;
_columnDataType = dataSchema.getStoredColumnDataTypes();
@@ -76,7 +77,14 @@ public class DataBlockBuilder {
}
}
- public static RowDataBlock buildFromRows(List<Object[]> rows, DataSchema dataSchema)
+ public void setNullRowIds(RoaringBitmap nullBitmap)
+ throws IOException {
+ NullBitmapUtils.setNullRowIds(nullBitmap, _fixedSizeDataByteArrayOutputStream,
+ _variableSizeDataByteArrayOutputStream);
+ }
+
+ public static RowDataBlock buildFromRows(List<Object[]> rows, @Nullable RoaringBitmap[] colNullBitmaps,
+ DataSchema dataSchema)
throws IOException {
DataBlockBuilder rowBuilder = new DataBlockBuilder(dataSchema, BaseDataBlock.Type.ROW);
rowBuilder._numRows = rows.size();
@@ -167,6 +175,12 @@ public class DataBlockBuilder {
}
rowBuilder._fixedSizeDataByteArrayOutputStream.write(byteBuffer.array(), 0, byteBuffer.position());
}
+ // Write null bitmaps after writing data.
+ if (colNullBitmaps != null) {
+ for (RoaringBitmap nullBitmap : colNullBitmaps) {
+ rowBuilder.setNullRowIds(nullBitmap);
+ }
+ }
return buildRowBlock(rowBuilder);
}
@@ -337,10 +351,15 @@ public class DataBlockBuilder {
throws IOException {
byteBuffer.putInt(builder._variableSizeDataByteArrayOutputStream.size());
int objectTypeValue = ObjectSerDeUtils.ObjectType.getObjectType(value).getValue();
- byte[] bytes = ObjectSerDeUtils.serialize(value, objectTypeValue);
- byteBuffer.putInt(bytes.length);
- builder._variableSizeDataOutputStream.writeInt(objectTypeValue);
- builder._variableSizeDataByteArrayOutputStream.write(bytes);
+ if (objectTypeValue == ObjectSerDeUtils.ObjectType.Null.getValue()) {
+ byteBuffer.putInt(0);
+ builder._variableSizeDataOutputStream.writeInt(objectTypeValue);
+ } else {
+ byte[] bytes = ObjectSerDeUtils.serialize(value, objectTypeValue);
+ byteBuffer.putInt(bytes.length);
+ builder._variableSizeDataOutputStream.writeInt(objectTypeValue);
+ builder._variableSizeDataByteArrayOutputStream.write(bytes);
+ }
}
private static void setColumn(DataBlockBuilder builder, ByteBuffer byteBuffer, int[] values)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/MetadataBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/MetadataBlock.java
index 9be572d938..06a637fc24 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/MetadataBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/MetadataBlock.java
@@ -40,7 +40,7 @@ public class MetadataBlock extends BaseDataBlock {
}
@Override
- protected int getDataBlockVersionType() {
+ public int getDataBlockVersionType() {
return VERSION + (Type.METADATA.ordinal() << DataBlockUtils.VERSION_TYPE_SHIFT);
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/RowDataBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/RowDataBlock.java
index bece1a7eaf..eec72546f0 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/RowDataBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/RowDataBlock.java
@@ -22,6 +22,8 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.roaringbitmap.RoaringBitmap;
/**
@@ -48,6 +50,29 @@ public class RowDataBlock extends BaseDataBlock {
computeBlockObjectConstants();
}
+ @Override
+ public RoaringBitmap getNullRowIds(int colId) {
+ // _fixedSizeData stores two ints per col's null bitmap: offset, and length.
+ int position = _numRows * _rowSizeInBytes + colId * Integer.BYTES * 2;
+ if (position >= _fixedSizeData.limit()) {
+ return null;
+ }
+
+ _fixedSizeData.position(position);
+ int offset = _fixedSizeData.getInt();
+ int bytesLength = _fixedSizeData.getInt();
+ RoaringBitmap nullBitmap;
+ if (bytesLength > 0) {
+ _variableSizeData.position(offset);
+ byte[] nullBitmapBytes = new byte[bytesLength];
+ _variableSizeData.get(nullBitmapBytes);
+ nullBitmap = ObjectSerDeUtils.ROARING_BITMAP_SER_DE.deserialize(nullBitmapBytes);
+ } else {
+ nullBitmap = new RoaringBitmap();
+ }
+ return nullBitmap;
+ }
+
protected void computeBlockObjectConstants() {
if (_dataSchema != null) {
_columnOffsets = new int[_numColumns];
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTable.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTable.java
index 788fe62765..a54079eb66 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTable.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTable.java
@@ -32,6 +32,7 @@ import org.apache.pinot.core.common.ObjectSerDeUtils;
import org.apache.pinot.spi.utils.BigDecimalUtils;
import org.apache.pinot.spi.utils.ByteArray;
import org.apache.pinot.spi.utils.BytesUtils;
+import org.roaringbitmap.RoaringBitmap;
import static java.nio.charset.StandardCharsets.UTF_8;
@@ -204,6 +205,10 @@ public abstract class BaseDataTable implements DataTable {
public <T> T getObject(int rowId, int colId) {
int size = positionCursorInVariableBuffer(rowId, colId);
int objectTypeValue = _variableSizeData.getInt();
+ if (size == 0) {
+ assert objectTypeValue == ObjectSerDeUtils.ObjectType.Null.getValue();
+ return null;
+ }
ByteBuffer byteBuffer = _variableSizeData.slice();
byteBuffer.limit(size);
return ObjectSerDeUtils.deserialize(byteBuffer, objectTypeValue);
@@ -260,6 +265,11 @@ public abstract class BaseDataTable implements DataTable {
return strings;
}
+ @Override
+ public RoaringBitmap getNullRowIds(int colId) {
+ return null;
+ }
+
private int positionCursorInVariableBuffer(int rowId, int colId) {
_fixedSizeData.position(rowId * _rowSizeInBytes + _columnOffsets[colId]);
_variableSizeData.position(_fixedSizeData.getInt());
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilder.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilder.java
index 05f29bd774..941fabc2ee 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilder.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilder.java
@@ -27,9 +27,11 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.NullBitmapUtils;
import org.apache.pinot.core.common.ObjectSerDeUtils;
import org.apache.pinot.spi.utils.BigDecimalUtils;
import org.apache.pinot.spi.utils.ByteArray;
+import org.roaringbitmap.RoaringBitmap;
/**
@@ -127,11 +129,22 @@ public class DataTableBuilder {
_version = version;
}
+ public static int getCurrentDataTableVersion() {
+ return _version;
+ }
+
public void startRow() {
_numRows++;
_currentRowDataByteBuffer = ByteBuffer.allocate(_rowSizeInBytes);
}
+ public void setNullRowIds(RoaringBitmap nullBitmap)
+ throws IOException {
+ assert _version >= VERSION_4;
+ NullBitmapUtils.setNullRowIds(nullBitmap, _fixedSizeDataByteArrayOutputStream,
+ _variableSizeDataByteArrayOutputStream);
+ }
+
public void setColumn(int colId, boolean value) {
_currentRowDataByteBuffer.position(_columnOffsets[colId]);
if (value) {
@@ -223,10 +236,15 @@ public class DataTableBuilder {
_currentRowDataByteBuffer.position(_columnOffsets[colId]);
_currentRowDataByteBuffer.putInt(_variableSizeDataByteArrayOutputStream.size());
int objectTypeValue = ObjectSerDeUtils.ObjectType.getObjectType(value).getValue();
- byte[] bytes = ObjectSerDeUtils.serialize(value, objectTypeValue);
- _currentRowDataByteBuffer.putInt(bytes.length);
- _variableSizeDataOutputStream.writeInt(objectTypeValue);
- _variableSizeDataByteArrayOutputStream.write(bytes);
+ if (objectTypeValue == ObjectSerDeUtils.ObjectType.Null.getValue()) {
+ _currentRowDataByteBuffer.putInt(0);
+ _variableSizeDataOutputStream.writeInt(objectTypeValue);
+ } else {
+ byte[] bytes = ObjectSerDeUtils.serialize(value, objectTypeValue);
+ _currentRowDataByteBuffer.putInt(bytes.length);
+ _variableSizeDataOutputStream.writeInt(objectTypeValue);
+ _variableSizeDataByteArrayOutputStream.write(bytes);
+ }
}
public void setColumn(int colId, int[] values)
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTest.java b/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTest.java
index 4ede7f2cd7..d7ec9a7bfa 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTest.java
@@ -109,7 +109,7 @@ public class DataBlockTest {
DataSchema dataSchema = new DataSchema(columnNames, columnDataTypes);
List<Object[]> rows = DataBlockTestUtils.getRandomRows(dataSchema, TEST_ROW_COUNT);
List<Object[]> columnars = DataBlockTestUtils.convertColumnar(dataSchema, rows);
- RowDataBlock rowBlock = DataBlockBuilder.buildFromRows(rows, dataSchema);
+ RowDataBlock rowBlock = DataBlockBuilder.buildFromRows(rows, null, dataSchema);
ColumnarDataBlock columnarBlock = DataBlockBuilder.buildFromColumns(columnars, dataSchema);
for (int colId = 0; colId < dataSchema.getColumnNames().length; colId++) {
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java b/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java
index 8dbc3910f6..fbe9d98ed9 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java
@@ -38,6 +38,7 @@ import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.common.utils.DataTable.MetadataKey;
import org.apache.pinot.core.query.request.context.ThreadTimer;
import org.apache.pinot.spi.utils.ByteArray;
+import org.roaringbitmap.RoaringBitmap;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -219,7 +220,7 @@ public class DataTableSerDeTest {
DataTableBuilder.setCurrentDataTableVersion(DataTableBuilder.VERSION_4);
DataTableBuilder dataTableBuilderV4WithDataOnly = new DataTableBuilder(dataSchema);
fillDataTableWithRandomData(dataTableBuilderV4WithDataOnly, columnDataTypes, numColumns);
- DataTable dataTableV4 = dataTableBuilderV4WithDataOnly.build(); // create a V3 data table
+ DataTable dataTableV4 = dataTableBuilderV4WithDataOnly.build(); // create a V4 data table
// Deserialize data table bytes as V4
newDataTable = DataTableFactory.getDataTable(dataTableV4.toBytes());
Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, ERROR_MESSAGE);
@@ -269,7 +270,7 @@ public class DataTableSerDeTest {
for (String key : EXPECTED_METADATA.keySet()) {
dataTableV4.getMetadata().put(key, EXPECTED_METADATA.get(key));
}
- // Deserialize data table bytes as V3
+ // Deserialize data table bytes as V4
newDataTable = DataTableFactory.getDataTable(dataTableV4.toBytes()); // Broker deserialize data table bytes as V4
Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, ERROR_MESSAGE);
Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS, ERROR_MESSAGE);
@@ -462,7 +463,7 @@ public class DataTableSerDeTest {
}
@Test
- public void testDataTableMetadataBytesLayout()
+ public void testDataTableVer3MetadataBytesLayout()
throws IOException {
DataSchema.ColumnDataType[] columnDataTypes = DataSchema.ColumnDataType.values();
int numColumns = columnDataTypes.length;
@@ -471,7 +472,9 @@ public class DataTableSerDeTest {
columnNames[i] = columnDataTypes[i].name();
}
+ ThreadTimer.setThreadCpuTimeMeasurementEnabled(false);
DataSchema dataSchema = new DataSchema(columnNames, columnDataTypes);
+ DataTableBuilder.setCurrentDataTableVersion(DataTableBuilder.VERSION_3);
DataTableBuilder dataTableBuilder = new DataTableBuilder(dataSchema);
fillDataTableWithRandomData(dataTableBuilder, columnDataTypes, numColumns);
@@ -506,7 +509,7 @@ public class DataTableSerDeTest {
try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(metadataBytes);
DataInputStream dataInputStream = new DataInputStream(byteArrayInputStream)) {
int numEntries = dataInputStream.readInt();
- // DataTable V3 serialization logic will add an extra RESPONSE_SER_CPU_TIME_NS KV pair into metadata
+ // DataTable V3 and V4 serialization logic will add an extra RESPONSE_SER_CPU_TIME_NS KV pair into metadata
Assert.assertEquals(numEntries, EXPECTED_METADATA.size());
for (int i = 0; i < numEntries; i++) {
int keyOrdinal = dataInputStream.readInt();
@@ -538,53 +541,65 @@ public class DataTableSerDeTest {
private void fillDataTableWithRandomData(DataTableBuilder dataTableBuilder,
DataSchema.ColumnDataType[] columnDataTypes, int numColumns)
throws IOException {
+ RoaringBitmap[] nullBitmaps = null;
+ if (DataTableBuilder.getCurrentDataTableVersion() >= DataTableBuilder.VERSION_4) {
+ nullBitmaps = new RoaringBitmap[numColumns];
+ for (int colId = 0; colId < numColumns; colId++) {
+ nullBitmaps[colId] = new RoaringBitmap();
+ }
+ }
for (int rowId = 0; rowId < NUM_ROWS; rowId++) {
dataTableBuilder.startRow();
for (int colId = 0; colId < numColumns; colId++) {
+ // Note: isNull is handled for SV columns only for now.
+ boolean isNull = nullBitmaps != null && RANDOM.nextFloat() < 0.1;
+ if (isNull) {
+ nullBitmaps[colId].add(rowId);
+ }
switch (columnDataTypes[colId]) {
case INT:
- INTS[rowId] = RANDOM.nextInt();
+ INTS[rowId] = isNull ? 0 : RANDOM.nextInt();
dataTableBuilder.setColumn(colId, INTS[rowId]);
break;
case LONG:
- LONGS[rowId] = RANDOM.nextLong();
+ LONGS[rowId] = isNull ? 0 : RANDOM.nextLong();
dataTableBuilder.setColumn(colId, LONGS[rowId]);
break;
case FLOAT:
- FLOATS[rowId] = RANDOM.nextFloat();
+ FLOATS[rowId] = isNull ? 0 : RANDOM.nextFloat();
dataTableBuilder.setColumn(colId, FLOATS[rowId]);
break;
case DOUBLE:
- DOUBLES[rowId] = RANDOM.nextDouble();
+ DOUBLES[rowId] = isNull ? 0.0 : RANDOM.nextDouble();
dataTableBuilder.setColumn(colId, DOUBLES[rowId]);
break;
case BIG_DECIMAL:
- BIG_DECIMALS[rowId] = BigDecimal.valueOf(RANDOM.nextDouble());
+ BIG_DECIMALS[rowId] = isNull ? BigDecimal.ZERO : BigDecimal.valueOf(RANDOM.nextDouble());
dataTableBuilder.setColumn(colId, BIG_DECIMALS[rowId]);
break;
case TIMESTAMP:
- TIMESTAMPS[rowId] = RANDOM.nextLong();
+ TIMESTAMPS[rowId] = isNull ? 0 : RANDOM.nextLong();
dataTableBuilder.setColumn(colId, TIMESTAMPS[rowId]);
break;
case BOOLEAN:
- BOOLEANS[rowId] = RANDOM.nextInt(2);
+ BOOLEANS[rowId] = isNull ? 0 : RANDOM.nextInt(2);
dataTableBuilder.setColumn(colId, BOOLEANS[rowId]);
break;
case STRING:
- STRINGS[rowId] = RandomStringUtils.random(RANDOM.nextInt(20));
+ STRINGS[rowId] = isNull ? "" : RandomStringUtils.random(RANDOM.nextInt(20));
dataTableBuilder.setColumn(colId, STRINGS[rowId]);
break;
case JSON:
- JSONS[rowId] = "{\"key\": \"" + RandomStringUtils.random(RANDOM.nextInt(20)) + "\"}";
+ JSONS[rowId] = isNull ? "" : "{\"key\": \"" + RandomStringUtils.random(RANDOM.nextInt(20)) + "\"}";
dataTableBuilder.setColumn(colId, JSONS[rowId]);
break;
case BYTES:
- BYTES[rowId] = RandomStringUtils.random(RANDOM.nextInt(20)).getBytes();
+ BYTES[rowId] = isNull ? new byte[0] : RandomStringUtils.random(RANDOM.nextInt(20)).getBytes();
dataTableBuilder.setColumn(colId, new ByteArray(BYTES[rowId]));
break;
// Just test Double here, all object types will be covered in ObjectCustomSerDeTest.
case OBJECT:
- OBJECTS[rowId] = RANDOM.nextDouble();
+ OBJECTS[rowId] = isNull ? null : RANDOM.nextDouble();
dataTableBuilder.setColumn(colId, OBJECTS[rowId]);
break;
case INT_ARRAY:
@@ -659,44 +674,56 @@ public class DataTableSerDeTest {
}
dataTableBuilder.finishRow();
}
+ if (nullBitmaps != null) {
+ for (int colId = 0; colId < numColumns; colId++) {
+ dataTableBuilder.setNullRowIds(nullBitmaps[colId]);
+ }
+ }
}
private void verifyDataIsSame(DataTable newDataTable, DataSchema.ColumnDataType[] columnDataTypes, int numColumns) {
+ RoaringBitmap[] nullBitmaps = new RoaringBitmap[numColumns];
+ for (int colId = 0; colId < numColumns; colId++) {
+ nullBitmaps[colId] = newDataTable.getNullRowIds(colId);
+ }
for (int rowId = 0; rowId < NUM_ROWS; rowId++) {
for (int colId = 0; colId < numColumns; colId++) {
+ boolean isNull = nullBitmaps[colId] != null && nullBitmaps[colId].contains(rowId);
switch (columnDataTypes[colId]) {
case INT:
- Assert.assertEquals(newDataTable.getInt(rowId, colId), INTS[rowId], ERROR_MESSAGE);
+ Assert.assertEquals(newDataTable.getInt(rowId, colId), isNull ? 0 : INTS[rowId], ERROR_MESSAGE);
break;
case LONG:
- Assert.assertEquals(newDataTable.getLong(rowId, colId), LONGS[rowId], ERROR_MESSAGE);
+ Assert.assertEquals(newDataTable.getLong(rowId, colId), isNull ? 0 : LONGS[rowId], ERROR_MESSAGE);
break;
case FLOAT:
- Assert.assertEquals(newDataTable.getFloat(rowId, colId), FLOATS[rowId], ERROR_MESSAGE);
+ Assert.assertEquals(newDataTable.getFloat(rowId, colId), isNull ? 0 : FLOATS[rowId], ERROR_MESSAGE);
break;
case DOUBLE:
- Assert.assertEquals(newDataTable.getDouble(rowId, colId), DOUBLES[rowId], ERROR_MESSAGE);
+ Assert.assertEquals(newDataTable.getDouble(rowId, colId), isNull ? 0.0 : DOUBLES[rowId], ERROR_MESSAGE);
break;
case BIG_DECIMAL:
- Assert.assertEquals(newDataTable.getBigDecimal(rowId, colId), BIG_DECIMALS[rowId], ERROR_MESSAGE);
+ Assert.assertEquals(newDataTable.getBigDecimal(rowId, colId), isNull ? BigDecimal.ZERO
+ : BIG_DECIMALS[rowId], ERROR_MESSAGE);
break;
case BOOLEAN:
- Assert.assertEquals(newDataTable.getInt(rowId, colId), BOOLEANS[rowId], ERROR_MESSAGE);
+ Assert.assertEquals(newDataTable.getInt(rowId, colId), isNull ? 0 : BOOLEANS[rowId], ERROR_MESSAGE);
break;
case TIMESTAMP:
- Assert.assertEquals(newDataTable.getLong(rowId, colId), TIMESTAMPS[rowId], ERROR_MESSAGE);
+ Assert.assertEquals(newDataTable.getLong(rowId, colId), isNull ? 0 : TIMESTAMPS[rowId], ERROR_MESSAGE);
break;
case STRING:
- Assert.assertEquals(newDataTable.getString(rowId, colId), STRINGS[rowId], ERROR_MESSAGE);
+ Assert.assertEquals(newDataTable.getString(rowId, colId), isNull ? "" : STRINGS[rowId], ERROR_MESSAGE);
break;
case JSON:
- Assert.assertEquals(newDataTable.getString(rowId, colId), JSONS[rowId], ERROR_MESSAGE);
+ Assert.assertEquals(newDataTable.getString(rowId, colId), isNull ? "" : JSONS[rowId], ERROR_MESSAGE);
break;
case BYTES:
- Assert.assertEquals(newDataTable.getBytes(rowId, colId).getBytes(), BYTES[rowId], ERROR_MESSAGE);
+ Assert.assertEquals(newDataTable.getBytes(rowId, colId).getBytes(), isNull ? new byte[0] : BYTES[rowId],
+ ERROR_MESSAGE);
break;
case OBJECT:
- Assert.assertEquals(newDataTable.getObject(rowId, colId), OBJECTS[rowId], ERROR_MESSAGE);
+ Assert.assertEquals(newDataTable.getObject(rowId, colId), isNull ? null : OBJECTS[rowId], ERROR_MESSAGE);
break;
case INT_ARRAY:
Assert.assertTrue(Arrays.equals(newDataTable.getIntArray(rowId, colId), INT_ARRAYS[rowId]), ERROR_MESSAGE);
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
index ae1bf43d4c..5bbe81428f 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
@@ -129,7 +129,7 @@ public class HashJoinOperator extends BaseOperator<TransferableBlock> {
rows.add(joinRow(leftRow, rightRow));
}
}
- return DataBlockBuilder.buildFromRows(rows, computeSchema());
+ return DataBlockBuilder.buildFromRows(rows, null, computeSchema());
}
private Object[] joinRow(Object[] leftRow, Object[] rightRow) {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index 8b5ccd5b99..cacf73572c 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -184,7 +184,7 @@ public class MailboxSendOperator extends BaseOperator<TransferableBlock> {
List<BaseDataBlock> dataTableList = new ArrayList<>(partitionSize);
for (int i = 0; i < partitionSize; i++) {
List<Object[]> objects = temporaryRows.get(i);
- dataTableList.add(DataBlockBuilder.buildFromRows(objects, dataTable.getDataSchema()));
+ dataTableList.add(DataBlockBuilder.buildFromRows(objects, null, dataTable.getDataSchema()));
}
return dataTableList;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org