You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2022/06/17 00:51:51 UTC

[pinot] branch master updated: DataTable V4 implementation with per-column null bitmaps (#8872)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 7024a6d5e3 DataTable V4 implementation with per-column null bitmaps (#8872)
7024a6d5e3 is described below

commit 7024a6d5e3be3a33becc0ffa3319b3f1102d884c
Author: nizarhejazi <96...@users.noreply.github.com>
AuthorDate: Thu Jun 16 17:51:46 2022 -0700

    DataTable V4 implementation with per-column null bitmaps (#8872)
---
 .../org/apache/pinot/common/utils/DataTable.java   |  3 +
 .../apache/pinot/core/common/NullBitmapUtils.java  | 49 ++++++++++++++
 .../apache/pinot/core/common/ObjectSerDeUtils.java | 10 ++-
 .../pinot/core/common/datablock/BaseDataBlock.java | 10 +++
 .../core/common/datablock/DataBlockBuilder.java    | 33 +++++++--
 .../pinot/core/common/datablock/MetadataBlock.java |  2 +-
 .../pinot/core/common/datablock/RowDataBlock.java  | 25 +++++++
 .../pinot/core/common/datatable/BaseDataTable.java | 10 +++
 .../core/common/datatable/DataTableBuilder.java    | 26 +++++--
 .../pinot/core/common/datablock/DataBlockTest.java |  2 +-
 .../core/common/datatable/DataTableSerDeTest.java  | 79 +++++++++++++++-------
 .../query/runtime/operator/HashJoinOperator.java   |  2 +-
 .../runtime/operator/MailboxSendOperator.java      |  2 +-
 13 files changed, 210 insertions(+), 43 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java
index 6157b5cac9..c01af3ab11 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java
@@ -26,6 +26,7 @@ import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.response.ProcessingException;
 import org.apache.pinot.spi.utils.ByteArray;
+import org.roaringbitmap.RoaringBitmap;
 
 
 /**
@@ -76,6 +77,8 @@ public interface DataTable {
 
   String[] getStringArray(int rowId, int colId);
 
+  RoaringBitmap getNullRowIds(int colId);
+
   DataTable toMetadataOnlyDataTable();
 
   DataTable toDataOnlyDataTable();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/NullBitmapUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/common/NullBitmapUtils.java
new file mode 100644
index 0000000000..f157d3274e
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/NullBitmapUtils.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.common;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import org.roaringbitmap.RoaringBitmap;
+
+
+public final class NullBitmapUtils {
+  private NullBitmapUtils() {
+  }
+
+  public static void setNullRowIds(RoaringBitmap nullBitmap, ByteArrayOutputStream fixedSizeByteArrayOutputStream,
+      ByteArrayOutputStream variableSizeDataByteArrayOutputStream)
+      throws IOException {
+    writeInt(fixedSizeByteArrayOutputStream, variableSizeDataByteArrayOutputStream.size());
+    if (nullBitmap == null || nullBitmap.isEmpty()) {
+      writeInt(fixedSizeByteArrayOutputStream, 0);
+    } else {
+      byte[] nullBitmapBytes = ObjectSerDeUtils.ROARING_BITMAP_SER_DE.serialize(nullBitmap);
+      writeInt(fixedSizeByteArrayOutputStream, nullBitmapBytes.length);
+      variableSizeDataByteArrayOutputStream.write(nullBitmapBytes);
+    }
+  }
+
+  private static void writeInt(ByteArrayOutputStream out, int value) {
+    out.write((value >>> 24) & 0xFF);
+    out.write((value >>> 16) & 0xFF);
+    out.write((value >>> 8) & 0xFF);
+    out.write(value & 0xFF);
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
index e6ed491fc8..0cd922eb6c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
@@ -56,6 +56,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import javax.annotation.Nullable;
 import org.apache.datasketches.memory.Memory;
 import org.apache.datasketches.theta.Sketch;
 import org.apache.pinot.core.query.distinct.DistinctTable;
@@ -119,7 +120,8 @@ public class ObjectSerDeUtils {
     LongLongPair(28),
     FloatLongPair(29),
     DoubleLongPair(30),
-    StringLongPair(31);
+    StringLongPair(31),
+    Null(100);
     private final int _value;
 
     ObjectType(int value) {
@@ -130,7 +132,11 @@ public class ObjectSerDeUtils {
       return _value;
     }
 
-    public static ObjectType getObjectType(Object value) {
+    public static ObjectType getObjectType(@Nullable Object value) {
+      if (value == null) {
+        return ObjectType.Null;
+      }
+
       if (value instanceof String) {
         return ObjectType.String;
       } else if (value instanceof Long) {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/BaseDataBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/BaseDataBlock.java
index 607fdef8bd..04a0ad1ded 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/BaseDataBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/BaseDataBlock.java
@@ -35,6 +35,7 @@ import org.apache.pinot.core.common.datatable.DataTableUtils;
 import org.apache.pinot.core.query.request.context.ThreadTimer;
 import org.apache.pinot.spi.utils.BigDecimalUtils;
 import org.apache.pinot.spi.utils.ByteArray;
+import org.roaringbitmap.RoaringBitmap;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 
@@ -237,6 +238,11 @@ public abstract class BaseDataBlock implements DataTable {
     return _numRows;
   }
 
+  @Override
+  public RoaringBitmap getNullRowIds(int colId) {
+    return null;
+  }
+
   // --------------------------------------------------------------------------
   // Fixed sized element access.
   // --------------------------------------------------------------------------
@@ -296,6 +302,10 @@ public abstract class BaseDataBlock implements DataTable {
   public <T> T getObject(int rowId, int colId) {
     int size = positionCursorInVariableBuffer(rowId, colId);
     int objectTypeValue = _variableSizeData.getInt();
+    if (size == 0) {
+      assert objectTypeValue == ObjectSerDeUtils.ObjectType.Null.getValue();
+      return null;
+    }
     ByteBuffer byteBuffer = _variableSizeData.slice();
     byteBuffer.limit(size);
     return ObjectSerDeUtils.deserialize(byteBuffer, objectTypeValue);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
index ff6cc4e43c..8a14b491e9 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
@@ -26,11 +26,14 @@ import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import javax.annotation.Nullable;
 import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.NullBitmapUtils;
 import org.apache.pinot.core.common.ObjectSerDeUtils;
 import org.apache.pinot.spi.utils.ArrayCopyUtils;
 import org.apache.pinot.spi.utils.BigDecimalUtils;
 import org.apache.pinot.spi.utils.ByteArray;
+import org.roaringbitmap.RoaringBitmap;
 
 
 public class DataBlockBuilder {
@@ -54,8 +57,6 @@ public class DataBlockBuilder {
       new DataOutputStream(_variableSizeDataByteArrayOutputStream);
 
 
-  private ByteBuffer _currentRowDataByteBuffer;
-
   private DataBlockBuilder(DataSchema dataSchema, BaseDataBlock.Type blockType) {
     _dataSchema = dataSchema;
     _columnDataType = dataSchema.getStoredColumnDataTypes();
@@ -76,7 +77,14 @@ public class DataBlockBuilder {
     }
   }
 
-  public static RowDataBlock buildFromRows(List<Object[]> rows, DataSchema dataSchema)
+  public void setNullRowIds(RoaringBitmap nullBitmap)
+      throws IOException {
+    NullBitmapUtils.setNullRowIds(nullBitmap, _fixedSizeDataByteArrayOutputStream,
+        _variableSizeDataByteArrayOutputStream);
+  }
+
+  public static RowDataBlock buildFromRows(List<Object[]> rows, @Nullable RoaringBitmap[] colNullBitmaps,
+      DataSchema dataSchema)
       throws IOException {
     DataBlockBuilder rowBuilder = new DataBlockBuilder(dataSchema, BaseDataBlock.Type.ROW);
     rowBuilder._numRows = rows.size();
@@ -167,6 +175,12 @@ public class DataBlockBuilder {
       }
       rowBuilder._fixedSizeDataByteArrayOutputStream.write(byteBuffer.array(), 0, byteBuffer.position());
     }
+    // Write null bitmaps after writing data.
+    if (colNullBitmaps != null) {
+      for (RoaringBitmap nullBitmap : colNullBitmaps) {
+        rowBuilder.setNullRowIds(nullBitmap);
+      }
+    }
     return buildRowBlock(rowBuilder);
   }
 
@@ -337,10 +351,15 @@ public class DataBlockBuilder {
       throws IOException {
     byteBuffer.putInt(builder._variableSizeDataByteArrayOutputStream.size());
     int objectTypeValue = ObjectSerDeUtils.ObjectType.getObjectType(value).getValue();
-    byte[] bytes = ObjectSerDeUtils.serialize(value, objectTypeValue);
-    byteBuffer.putInt(bytes.length);
-    builder._variableSizeDataOutputStream.writeInt(objectTypeValue);
-    builder._variableSizeDataByteArrayOutputStream.write(bytes);
+    if (objectTypeValue == ObjectSerDeUtils.ObjectType.Null.getValue()) {
+      byteBuffer.putInt(0);
+      builder._variableSizeDataOutputStream.writeInt(objectTypeValue);
+    } else {
+      byte[] bytes = ObjectSerDeUtils.serialize(value, objectTypeValue);
+      byteBuffer.putInt(bytes.length);
+      builder._variableSizeDataOutputStream.writeInt(objectTypeValue);
+      builder._variableSizeDataByteArrayOutputStream.write(bytes);
+    }
   }
 
   private static void setColumn(DataBlockBuilder builder, ByteBuffer byteBuffer, int[] values)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/MetadataBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/MetadataBlock.java
index 9be572d938..06a637fc24 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/MetadataBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/MetadataBlock.java
@@ -40,7 +40,7 @@ public class MetadataBlock extends BaseDataBlock {
   }
 
   @Override
-  protected int getDataBlockVersionType() {
+  public int getDataBlockVersionType() {
     return VERSION + (Type.METADATA.ordinal() << DataBlockUtils.VERSION_TYPE_SHIFT);
   }
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/RowDataBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/RowDataBlock.java
index bece1a7eaf..eec72546f0 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/RowDataBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/RowDataBlock.java
@@ -22,6 +22,8 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Map;
 import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.roaringbitmap.RoaringBitmap;
 
 
 /**
@@ -48,6 +50,29 @@ public class RowDataBlock extends BaseDataBlock {
     computeBlockObjectConstants();
   }
 
+  @Override
+  public RoaringBitmap getNullRowIds(int colId) {
+    // _fixedSizeData stores two ints per col's null bitmap: offset, and length.
+    int position = _numRows * _rowSizeInBytes + colId * Integer.BYTES * 2;
+    if (position >= _fixedSizeData.limit()) {
+      return null;
+    }
+
+    _fixedSizeData.position(position);
+    int offset = _fixedSizeData.getInt();
+    int bytesLength = _fixedSizeData.getInt();
+    RoaringBitmap nullBitmap;
+    if (bytesLength > 0) {
+      _variableSizeData.position(offset);
+      byte[] nullBitmapBytes = new byte[bytesLength];
+      _variableSizeData.get(nullBitmapBytes);
+      nullBitmap = ObjectSerDeUtils.ROARING_BITMAP_SER_DE.deserialize(nullBitmapBytes);
+    } else {
+      nullBitmap = new RoaringBitmap();
+    }
+    return nullBitmap;
+  }
+
   protected void computeBlockObjectConstants() {
     if (_dataSchema != null) {
       _columnOffsets = new int[_numColumns];
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTable.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTable.java
index 788fe62765..a54079eb66 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTable.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTable.java
@@ -32,6 +32,7 @@ import org.apache.pinot.core.common.ObjectSerDeUtils;
 import org.apache.pinot.spi.utils.BigDecimalUtils;
 import org.apache.pinot.spi.utils.ByteArray;
 import org.apache.pinot.spi.utils.BytesUtils;
+import org.roaringbitmap.RoaringBitmap;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 
@@ -204,6 +205,10 @@ public abstract class BaseDataTable implements DataTable {
   public <T> T getObject(int rowId, int colId) {
     int size = positionCursorInVariableBuffer(rowId, colId);
     int objectTypeValue = _variableSizeData.getInt();
+    if (size == 0) {
+      assert objectTypeValue == ObjectSerDeUtils.ObjectType.Null.getValue();
+      return null;
+    }
     ByteBuffer byteBuffer = _variableSizeData.slice();
     byteBuffer.limit(size);
     return ObjectSerDeUtils.deserialize(byteBuffer, objectTypeValue);
@@ -260,6 +265,11 @@ public abstract class BaseDataTable implements DataTable {
     return strings;
   }
 
+  @Override
+  public RoaringBitmap getNullRowIds(int colId) {
+    return null;
+  }
+
   private int positionCursorInVariableBuffer(int rowId, int colId) {
     _fixedSizeData.position(rowId * _rowSizeInBytes + _columnOffsets[colId]);
     _variableSizeData.position(_fixedSizeData.getInt());
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilder.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilder.java
index 05f29bd774..941fabc2ee 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilder.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilder.java
@@ -27,9 +27,11 @@ import java.util.HashMap;
 import java.util.Map;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.NullBitmapUtils;
 import org.apache.pinot.core.common.ObjectSerDeUtils;
 import org.apache.pinot.spi.utils.BigDecimalUtils;
 import org.apache.pinot.spi.utils.ByteArray;
+import org.roaringbitmap.RoaringBitmap;
 
 
 /**
@@ -127,11 +129,22 @@ public class DataTableBuilder {
     _version = version;
   }
 
+  public static int getCurrentDataTableVersion() {
+    return _version;
+  }
+
   public void startRow() {
     _numRows++;
     _currentRowDataByteBuffer = ByteBuffer.allocate(_rowSizeInBytes);
   }
 
+  public void setNullRowIds(RoaringBitmap nullBitmap)
+      throws IOException {
+    assert _version >= VERSION_4;
+    NullBitmapUtils.setNullRowIds(nullBitmap, _fixedSizeDataByteArrayOutputStream,
+        _variableSizeDataByteArrayOutputStream);
+  }
+
   public void setColumn(int colId, boolean value) {
     _currentRowDataByteBuffer.position(_columnOffsets[colId]);
     if (value) {
@@ -223,10 +236,15 @@ public class DataTableBuilder {
     _currentRowDataByteBuffer.position(_columnOffsets[colId]);
     _currentRowDataByteBuffer.putInt(_variableSizeDataByteArrayOutputStream.size());
     int objectTypeValue = ObjectSerDeUtils.ObjectType.getObjectType(value).getValue();
-    byte[] bytes = ObjectSerDeUtils.serialize(value, objectTypeValue);
-    _currentRowDataByteBuffer.putInt(bytes.length);
-    _variableSizeDataOutputStream.writeInt(objectTypeValue);
-    _variableSizeDataByteArrayOutputStream.write(bytes);
+    if (objectTypeValue == ObjectSerDeUtils.ObjectType.Null.getValue()) {
+      _currentRowDataByteBuffer.putInt(0);
+      _variableSizeDataOutputStream.writeInt(objectTypeValue);
+    } else {
+      byte[] bytes = ObjectSerDeUtils.serialize(value, objectTypeValue);
+      _currentRowDataByteBuffer.putInt(bytes.length);
+      _variableSizeDataOutputStream.writeInt(objectTypeValue);
+      _variableSizeDataByteArrayOutputStream.write(bytes);
+    }
   }
 
   public void setColumn(int colId, int[] values)
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTest.java b/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTest.java
index 4ede7f2cd7..d7ec9a7bfa 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTest.java
@@ -109,7 +109,7 @@ public class DataBlockTest {
     DataSchema dataSchema = new DataSchema(columnNames, columnDataTypes);
     List<Object[]> rows = DataBlockTestUtils.getRandomRows(dataSchema, TEST_ROW_COUNT);
     List<Object[]> columnars = DataBlockTestUtils.convertColumnar(dataSchema, rows);
-    RowDataBlock rowBlock = DataBlockBuilder.buildFromRows(rows, dataSchema);
+    RowDataBlock rowBlock = DataBlockBuilder.buildFromRows(rows, null, dataSchema);
     ColumnarDataBlock columnarBlock = DataBlockBuilder.buildFromColumns(columnars, dataSchema);
 
     for (int colId = 0; colId < dataSchema.getColumnNames().length; colId++) {
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java b/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java
index 8dbc3910f6..fbe9d98ed9 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java
@@ -38,6 +38,7 @@ import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.common.utils.DataTable.MetadataKey;
 import org.apache.pinot.core.query.request.context.ThreadTimer;
 import org.apache.pinot.spi.utils.ByteArray;
+import org.roaringbitmap.RoaringBitmap;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -219,7 +220,7 @@ public class DataTableSerDeTest {
     DataTableBuilder.setCurrentDataTableVersion(DataTableBuilder.VERSION_4);
     DataTableBuilder dataTableBuilderV4WithDataOnly = new DataTableBuilder(dataSchema);
     fillDataTableWithRandomData(dataTableBuilderV4WithDataOnly, columnDataTypes, numColumns);
-    DataTable dataTableV4 = dataTableBuilderV4WithDataOnly.build(); // create a V3 data table
+    DataTable dataTableV4 = dataTableBuilderV4WithDataOnly.build(); // create a V4 data table
     // Deserialize data table bytes as V4
     newDataTable = DataTableFactory.getDataTable(dataTableV4.toBytes());
     Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, ERROR_MESSAGE);
@@ -269,7 +270,7 @@ public class DataTableSerDeTest {
     for (String key : EXPECTED_METADATA.keySet()) {
       dataTableV4.getMetadata().put(key, EXPECTED_METADATA.get(key));
     }
-    // Deserialize data table bytes as V3
+    // Deserialize data table bytes as V4
     newDataTable = DataTableFactory.getDataTable(dataTableV4.toBytes()); // Broker deserialize data table bytes as V4
     Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, ERROR_MESSAGE);
     Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS, ERROR_MESSAGE);
@@ -462,7 +463,7 @@ public class DataTableSerDeTest {
   }
 
   @Test
-  public void testDataTableMetadataBytesLayout()
+  public void testDataTableVer3MetadataBytesLayout()
       throws IOException {
     DataSchema.ColumnDataType[] columnDataTypes = DataSchema.ColumnDataType.values();
     int numColumns = columnDataTypes.length;
@@ -471,7 +472,9 @@ public class DataTableSerDeTest {
       columnNames[i] = columnDataTypes[i].name();
     }
 
+    ThreadTimer.setThreadCpuTimeMeasurementEnabled(false);
     DataSchema dataSchema = new DataSchema(columnNames, columnDataTypes);
+    DataTableBuilder.setCurrentDataTableVersion(DataTableBuilder.VERSION_3);
     DataTableBuilder dataTableBuilder = new DataTableBuilder(dataSchema);
     fillDataTableWithRandomData(dataTableBuilder, columnDataTypes, numColumns);
 
@@ -506,7 +509,7 @@ public class DataTableSerDeTest {
     try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(metadataBytes);
         DataInputStream dataInputStream = new DataInputStream(byteArrayInputStream)) {
       int numEntries = dataInputStream.readInt();
-      // DataTable V3 serialization logic will add an extra RESPONSE_SER_CPU_TIME_NS KV pair into metadata
+      // DataTable V3 and V4 serialization logic will add an extra RESPONSE_SER_CPU_TIME_NS KV pair into metadata
       Assert.assertEquals(numEntries, EXPECTED_METADATA.size());
       for (int i = 0; i < numEntries; i++) {
         int keyOrdinal = dataInputStream.readInt();
@@ -538,53 +541,65 @@ public class DataTableSerDeTest {
   private void fillDataTableWithRandomData(DataTableBuilder dataTableBuilder,
       DataSchema.ColumnDataType[] columnDataTypes, int numColumns)
       throws IOException {
+    RoaringBitmap[] nullBitmaps = null;
+    if (DataTableBuilder.getCurrentDataTableVersion() >= DataTableBuilder.VERSION_4) {
+      nullBitmaps = new RoaringBitmap[numColumns];
+      for (int colId = 0; colId < numColumns; colId++) {
+        nullBitmaps[colId] = new RoaringBitmap();
+      }
+    }
     for (int rowId = 0; rowId < NUM_ROWS; rowId++) {
       dataTableBuilder.startRow();
       for (int colId = 0; colId < numColumns; colId++) {
+        // Note: isNull is handled for SV columns only for now.
+        boolean isNull = nullBitmaps != null && RANDOM.nextFloat() < 0.1;
+        if (isNull) {
+          nullBitmaps[colId].add(rowId);
+        }
         switch (columnDataTypes[colId]) {
           case INT:
-            INTS[rowId] = RANDOM.nextInt();
+            INTS[rowId] = isNull ? 0 : RANDOM.nextInt();
             dataTableBuilder.setColumn(colId, INTS[rowId]);
             break;
           case LONG:
-            LONGS[rowId] = RANDOM.nextLong();
+            LONGS[rowId] = isNull ? 0 : RANDOM.nextLong();
             dataTableBuilder.setColumn(colId, LONGS[rowId]);
             break;
           case FLOAT:
-            FLOATS[rowId] = RANDOM.nextFloat();
+            FLOATS[rowId] = isNull ? 0 : RANDOM.nextFloat();
             dataTableBuilder.setColumn(colId, FLOATS[rowId]);
             break;
           case DOUBLE:
-            DOUBLES[rowId] = RANDOM.nextDouble();
+            DOUBLES[rowId] = isNull ? 0.0 : RANDOM.nextDouble();
             dataTableBuilder.setColumn(colId, DOUBLES[rowId]);
             break;
           case BIG_DECIMAL:
-            BIG_DECIMALS[rowId] = BigDecimal.valueOf(RANDOM.nextDouble());
+            BIG_DECIMALS[rowId] = isNull ? BigDecimal.ZERO : BigDecimal.valueOf(RANDOM.nextDouble());
             dataTableBuilder.setColumn(colId, BIG_DECIMALS[rowId]);
             break;
           case TIMESTAMP:
-            TIMESTAMPS[rowId] = RANDOM.nextLong();
+            TIMESTAMPS[rowId] = isNull ? 0 : RANDOM.nextLong();
             dataTableBuilder.setColumn(colId, TIMESTAMPS[rowId]);
             break;
           case BOOLEAN:
-            BOOLEANS[rowId] = RANDOM.nextInt(2);
+            BOOLEANS[rowId] = isNull ? 0 : RANDOM.nextInt(2);
             dataTableBuilder.setColumn(colId, BOOLEANS[rowId]);
             break;
           case STRING:
-            STRINGS[rowId] = RandomStringUtils.random(RANDOM.nextInt(20));
+            STRINGS[rowId] = isNull ? "" : RandomStringUtils.random(RANDOM.nextInt(20));
             dataTableBuilder.setColumn(colId, STRINGS[rowId]);
             break;
           case JSON:
-            JSONS[rowId] = "{\"key\": \"" + RandomStringUtils.random(RANDOM.nextInt(20)) + "\"}";
+            JSONS[rowId] = isNull ? "" : "{\"key\": \"" + RandomStringUtils.random(RANDOM.nextInt(20)) + "\"}";
             dataTableBuilder.setColumn(colId, JSONS[rowId]);
             break;
           case BYTES:
-            BYTES[rowId] = RandomStringUtils.random(RANDOM.nextInt(20)).getBytes();
+            BYTES[rowId] = isNull ? new byte[0] : RandomStringUtils.random(RANDOM.nextInt(20)).getBytes();
             dataTableBuilder.setColumn(colId, new ByteArray(BYTES[rowId]));
             break;
           // Just test Double here, all object types will be covered in ObjectCustomSerDeTest.
           case OBJECT:
-            OBJECTS[rowId] = RANDOM.nextDouble();
+            OBJECTS[rowId] = isNull ? null : RANDOM.nextDouble();
             dataTableBuilder.setColumn(colId, OBJECTS[rowId]);
             break;
           case INT_ARRAY:
@@ -659,44 +674,56 @@ public class DataTableSerDeTest {
       }
       dataTableBuilder.finishRow();
     }
+    if (nullBitmaps != null) {
+      for (int colId = 0; colId < numColumns; colId++) {
+        dataTableBuilder.setNullRowIds(nullBitmaps[colId]);
+      }
+    }
   }
 
   private void verifyDataIsSame(DataTable newDataTable, DataSchema.ColumnDataType[] columnDataTypes, int numColumns) {
+    RoaringBitmap[] nullBitmaps = new RoaringBitmap[numColumns];
+    for (int colId = 0; colId < numColumns; colId++) {
+      nullBitmaps[colId] = newDataTable.getNullRowIds(colId);
+    }
     for (int rowId = 0; rowId < NUM_ROWS; rowId++) {
       for (int colId = 0; colId < numColumns; colId++) {
+        boolean isNull = nullBitmaps[colId] != null && nullBitmaps[colId].contains(rowId);
         switch (columnDataTypes[colId]) {
           case INT:
-            Assert.assertEquals(newDataTable.getInt(rowId, colId), INTS[rowId], ERROR_MESSAGE);
+            Assert.assertEquals(newDataTable.getInt(rowId, colId), isNull ? 0 : INTS[rowId], ERROR_MESSAGE);
             break;
           case LONG:
-            Assert.assertEquals(newDataTable.getLong(rowId, colId), LONGS[rowId], ERROR_MESSAGE);
+            Assert.assertEquals(newDataTable.getLong(rowId, colId), isNull ? 0 : LONGS[rowId], ERROR_MESSAGE);
             break;
           case FLOAT:
-            Assert.assertEquals(newDataTable.getFloat(rowId, colId), FLOATS[rowId], ERROR_MESSAGE);
+            Assert.assertEquals(newDataTable.getFloat(rowId, colId), isNull ? 0 : FLOATS[rowId], ERROR_MESSAGE);
             break;
           case DOUBLE:
-            Assert.assertEquals(newDataTable.getDouble(rowId, colId), DOUBLES[rowId], ERROR_MESSAGE);
+            Assert.assertEquals(newDataTable.getDouble(rowId, colId), isNull ? 0.0 : DOUBLES[rowId], ERROR_MESSAGE);
             break;
           case BIG_DECIMAL:
-            Assert.assertEquals(newDataTable.getBigDecimal(rowId, colId), BIG_DECIMALS[rowId], ERROR_MESSAGE);
+            Assert.assertEquals(newDataTable.getBigDecimal(rowId, colId), isNull ? BigDecimal.ZERO
+                : BIG_DECIMALS[rowId], ERROR_MESSAGE);
             break;
           case BOOLEAN:
-            Assert.assertEquals(newDataTable.getInt(rowId, colId), BOOLEANS[rowId], ERROR_MESSAGE);
+            Assert.assertEquals(newDataTable.getInt(rowId, colId), isNull ? 0 : BOOLEANS[rowId], ERROR_MESSAGE);
             break;
           case TIMESTAMP:
-            Assert.assertEquals(newDataTable.getLong(rowId, colId), TIMESTAMPS[rowId], ERROR_MESSAGE);
+            Assert.assertEquals(newDataTable.getLong(rowId, colId), isNull ? 0 : TIMESTAMPS[rowId], ERROR_MESSAGE);
             break;
           case STRING:
-            Assert.assertEquals(newDataTable.getString(rowId, colId), STRINGS[rowId], ERROR_MESSAGE);
+            Assert.assertEquals(newDataTable.getString(rowId, colId), isNull ? "" : STRINGS[rowId], ERROR_MESSAGE);
             break;
           case JSON:
-            Assert.assertEquals(newDataTable.getString(rowId, colId), JSONS[rowId], ERROR_MESSAGE);
+            Assert.assertEquals(newDataTable.getString(rowId, colId), isNull ? "" : JSONS[rowId], ERROR_MESSAGE);
             break;
           case BYTES:
-            Assert.assertEquals(newDataTable.getBytes(rowId, colId).getBytes(), BYTES[rowId], ERROR_MESSAGE);
+            Assert.assertEquals(newDataTable.getBytes(rowId, colId).getBytes(), isNull ? new byte[0] : BYTES[rowId],
+                ERROR_MESSAGE);
             break;
           case OBJECT:
-            Assert.assertEquals(newDataTable.getObject(rowId, colId), OBJECTS[rowId], ERROR_MESSAGE);
+            Assert.assertEquals(newDataTable.getObject(rowId, colId), isNull ? null : OBJECTS[rowId], ERROR_MESSAGE);
             break;
           case INT_ARRAY:
             Assert.assertTrue(Arrays.equals(newDataTable.getIntArray(rowId, colId), INT_ARRAYS[rowId]), ERROR_MESSAGE);
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
index ae1bf43d4c..5bbe81428f 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
@@ -129,7 +129,7 @@ public class HashJoinOperator extends BaseOperator<TransferableBlock> {
         rows.add(joinRow(leftRow, rightRow));
       }
     }
-    return DataBlockBuilder.buildFromRows(rows, computeSchema());
+    return DataBlockBuilder.buildFromRows(rows, null, computeSchema());
   }
 
   private Object[] joinRow(Object[] leftRow, Object[] rightRow) {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index 8b5ccd5b99..cacf73572c 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -184,7 +184,7 @@ public class MailboxSendOperator extends BaseOperator<TransferableBlock> {
     List<BaseDataBlock> dataTableList = new ArrayList<>(partitionSize);
     for (int i = 0; i < partitionSize; i++) {
       List<Object[]> objects = temporaryRows.get(i);
-      dataTableList.add(DataBlockBuilder.buildFromRows(objects, dataTable.getDataSchema()));
+      dataTableList.add(DataBlockBuilder.buildFromRows(objects, null, dataTable.getDataSchema()));
     }
     return dataTableList;
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org