You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by si...@apache.org on 2022/09/20 19:52:13 UTC

[pinot] branch master updated: [multistage] support NULL in data blocks (#9427)

This is an automated email from the ASF dual-hosted git repository.

siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 71b84452c9 [multistage] support NULL in data blocks (#9427)
71b84452c9 is described below

commit 71b84452c9a2d84b9e111e278479f21c81d321fc
Author: Rong Rong <ro...@apache.org>
AuthorDate: Tue Sep 20 12:52:06 2022 -0700

    [multistage] support NULL in data blocks (#9427)
    
    * initial commit for supporting null in datablock
    
    * add test according to comment
    
    Co-authored-by: Rong Rong <ro...@startree.ai>
---
 .../pinot/core/common/datablock/BaseDataBlock.java |  21 ++-
 .../core/common/datablock/ColumnarDataBlock.java   |   2 +
 .../core/common/datablock/DataBlockBuilder.java    | 166 ++++++++++++++++-----
 .../core/common/datablock/DataBlockUtils.java      |  12 +-
 .../pinot/core/common/datablock/RowDataBlock.java  |  25 +---
 .../pinot/core/common/datablock/DataBlockTest.java |  48 ++++--
 .../core/common/datablock/DataBlockTestUtils.java  |  28 ++--
 .../query/runtime/blocks/TransferableBlock.java    |   6 +-
 .../runtime/operator/MailboxSendOperator.java      |   2 +-
 9 files changed, 216 insertions(+), 94 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/BaseDataBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/BaseDataBlock.java
index 74d42f6628..ce7da42063 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/BaseDataBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/BaseDataBlock.java
@@ -86,6 +86,7 @@ public abstract class BaseDataBlock implements DataTable {
 
   protected int _numRows;
   protected int _numColumns;
+  protected int _fixDataSize;
   protected DataSchema _dataSchema;
   protected String[] _stringDictionary;
   protected byte[] _fixedSizeDataBytes;
@@ -107,6 +108,7 @@ public abstract class BaseDataBlock implements DataTable {
     _numRows = numRows;
     _dataSchema = dataSchema;
     _numColumns = dataSchema == null ? 0 : dataSchema.size();
+    _fixDataSize = 0;
     _stringDictionary = stringDictionary;
     _fixedSizeDataBytes = fixedSizeDataBytes;
     _fixedSizeData = ByteBuffer.wrap(fixedSizeDataBytes);
@@ -122,6 +124,7 @@ public abstract class BaseDataBlock implements DataTable {
   public BaseDataBlock() {
     _numRows = 0;
     _numColumns = 0;
+    _fixDataSize = 0;
     _dataSchema = null;
     _stringDictionary = null;
     _fixedSizeDataBytes = null;
@@ -249,7 +252,23 @@ public abstract class BaseDataBlock implements DataTable {
   @Nullable
   @Override
   public RoaringBitmap getNullRowIds(int colId) {
-    return null;
+    // _fixedSizeData stores two ints per col's null bitmap: offset, and length.
+    int position = _fixDataSize + colId * Integer.BYTES * 2;
+    if (_fixedSizeData == null || position >= _fixedSizeData.limit()) {
+      return null;
+    }
+
+    _fixedSizeData.position(position);
+    int offset = _fixedSizeData.getInt();
+    int bytesLength = _fixedSizeData.getInt();
+    if (bytesLength > 0) {
+      _variableSizeData.position(offset);
+      byte[] nullBitmapBytes = new byte[bytesLength];
+      _variableSizeData.get(nullBitmapBytes);
+      return ObjectSerDeUtils.ROARING_BITMAP_SER_DE.deserialize(nullBitmapBytes);
+    } else {
+      return null;
+    }
   }
 
   // --------------------------------------------------------------------------
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/ColumnarDataBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/ColumnarDataBlock.java
index 9a3cff57f7..cdf8ed0849 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/ColumnarDataBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/ColumnarDataBlock.java
@@ -48,6 +48,7 @@ public class ColumnarDataBlock extends BaseDataBlock {
   }
 
   protected void computeBlockObjectConstants() {
+    _fixDataSize = 0;
     if (_dataSchema != null) {
       _cumulativeColumnOffsetSizeInBytes = new int[_numColumns];
       _columnSizeInBytes = new int[_numColumns];
@@ -57,6 +58,7 @@ public class ColumnarDataBlock extends BaseDataBlock {
         _cumulativeColumnOffsetSizeInBytes[i] = cumulativeColumnOffset;
         cumulativeColumnOffset += _columnSizeInBytes[i] * _numRows;
       }
+      _fixDataSize = cumulativeColumnOffset;
     }
   }
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
index 8a47f261ff..e4f9a76a85 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
@@ -38,7 +38,7 @@ import org.roaringbitmap.RoaringBitmap;
 public class DataBlockBuilder {
   private final DataSchema _dataSchema;
   private final BaseDataBlock.Type _blockType;
-  private final DataSchema.ColumnDataType[] _columnDataType;
+  private final DataSchema.ColumnDataType[] _columnDataTypes;
 
   private int[] _columnOffsets;
   private int _rowSizeInBytes;
@@ -57,7 +57,7 @@ public class DataBlockBuilder {
 
   private DataBlockBuilder(DataSchema dataSchema, BaseDataBlock.Type blockType) {
     _dataSchema = dataSchema;
-    _columnDataType = dataSchema.getStoredColumnDataTypes();
+    _columnDataTypes = dataSchema.getStoredColumnDataTypes();
     _blockType = blockType;
     _numColumns = dataSchema.size();
     if (_blockType == BaseDataBlock.Type.COLUMNAR) {
@@ -87,16 +87,30 @@ public class DataBlockBuilder {
     }
   }
 
-  public static RowDataBlock buildFromRows(List<Object[]> rows, @Nullable RoaringBitmap[] colNullBitmaps,
-      DataSchema dataSchema)
+  public static RowDataBlock buildFromRows(List<Object[]> rows, DataSchema dataSchema)
       throws IOException {
     DataBlockBuilder rowBuilder = new DataBlockBuilder(dataSchema, BaseDataBlock.Type.ROW);
+    // TODO: consolidate these null utils into data table utils.
+    // Selection / Agg / Distinct all have similar code.
+    int numColumns = rowBuilder._numColumns;
+    RoaringBitmap[] nullBitmaps = new RoaringBitmap[numColumns];
+    DataSchema.ColumnDataType[] storedColumnDataTypes = dataSchema.getStoredColumnDataTypes();
+    Object[] nullPlaceholders = new Object[numColumns];
+    for (int colId = 0; colId < numColumns; colId++) {
+      nullBitmaps[colId] = new RoaringBitmap();
+      nullPlaceholders[colId] = storedColumnDataTypes[colId].getNullPlaceholder();
+    }
     rowBuilder._numRows = rows.size();
-    for (Object[] row : rows) {
+    for (int rowId = 0; rowId < rows.size(); rowId++) {
+      Object[] row = rows.get(rowId);
       ByteBuffer byteBuffer = ByteBuffer.allocate(rowBuilder._rowSizeInBytes);
-      for (int i = 0; i < rowBuilder._numColumns; i++) {
-        Object value = row[i];
-        switch (rowBuilder._columnDataType[i]) {
+      for (int colId = 0; colId < rowBuilder._numColumns; colId++) {
+        Object value = row[colId];
+        if (value == null) {
+          nullBitmaps[colId].add(rowId);
+          value = nullPlaceholders[colId];
+        }
+        switch (rowBuilder._columnDataTypes[colId]) {
           // Single-value column
           case INT:
             byteBuffer.putInt(((Number) value).intValue());
@@ -168,86 +182,147 @@ public class DataBlockBuilder {
             }
             break;
           case BYTES_ARRAY:
+            setColumn(rowBuilder, byteBuffer, (byte[][]) value);
+            break;
           case STRING_ARRAY:
             setColumn(rowBuilder, byteBuffer, (String[]) value);
             break;
           default:
             throw new IllegalStateException(
-                String.format("Unsupported data type: %s for column: %s", rowBuilder._columnDataType[i],
-                    rowBuilder._dataSchema.getColumnName(i)));
+                String.format("Unsupported data type: %s for column: %s", rowBuilder._columnDataTypes[colId],
+                    rowBuilder._dataSchema.getColumnName(colId)));
         }
       }
       rowBuilder._fixedSizeDataByteArrayOutputStream.write(byteBuffer.array(), 0, byteBuffer.position());
     }
     // Write null bitmaps after writing data.
-    if (colNullBitmaps != null) {
-      for (RoaringBitmap nullBitmap : colNullBitmaps) {
-        rowBuilder.setNullRowIds(nullBitmap);
-      }
+    for (RoaringBitmap nullBitmap : nullBitmaps) {
+      rowBuilder.setNullRowIds(nullBitmap);
     }
     return buildRowBlock(rowBuilder);
   }
 
-  public static ColumnarDataBlock buildFromColumns(List<Object[]> columns, @Nullable RoaringBitmap[] colNullBitmaps,
-      DataSchema dataSchema)
+  public static ColumnarDataBlock buildFromColumns(List<Object[]> columns, DataSchema dataSchema)
       throws IOException {
     DataBlockBuilder columnarBuilder = new DataBlockBuilder(dataSchema, BaseDataBlock.Type.COLUMNAR);
-    for (int i = 0; i < columns.size(); i++) {
-      Object[] column = columns.get(i);
+
+    // TODO: consolidate these null utils into data table utils.
+    // Selection / Agg / Distinct all have similar code.
+    int numColumns = columnarBuilder._numColumns;
+    RoaringBitmap[] nullBitmaps = new RoaringBitmap[numColumns];
+    DataSchema.ColumnDataType[] storedColumnDataTypes = dataSchema.getStoredColumnDataTypes();
+    Object[] nullPlaceholders = new Object[numColumns];
+    for (int colId = 0; colId < numColumns; colId++) {
+      nullBitmaps[colId] = new RoaringBitmap();
+      nullPlaceholders[colId] = storedColumnDataTypes[colId].getNullPlaceholder();
+    }
+    for (int colId = 0; colId < columns.size(); colId++) {
+      Object[] column = columns.get(colId);
       columnarBuilder._numRows = column.length;
-      ByteBuffer byteBuffer = ByteBuffer.allocate(columnarBuilder._numRows * columnarBuilder._columnSizeInBytes[i]);
-      switch (columnarBuilder._columnDataType[i]) {
+      ByteBuffer byteBuffer = ByteBuffer.allocate(columnarBuilder._numRows * columnarBuilder._columnSizeInBytes[colId]);
+      Object value;
+      switch (columnarBuilder._columnDataTypes[colId]) {
         // Single-value column
         case INT:
-          for (Object value : column) {
+          for (int rowId = 0; rowId < columnarBuilder._numRows; rowId++) {
+            value = column[rowId];
+            if (value == null) {
+              nullBitmaps[colId].add(rowId);
+              value = nullPlaceholders[colId];
+            }
             byteBuffer.putInt(((Number) value).intValue());
           }
           break;
         case LONG:
-          for (Object value : column) {
+          for (int rowId = 0; rowId < columnarBuilder._numRows; rowId++) {
+            value = column[rowId];
+            if (value == null) {
+              nullBitmaps[colId].add(rowId);
+              value = nullPlaceholders[colId];
+            }
             byteBuffer.putLong(((Number) value).longValue());
           }
           break;
         case FLOAT:
-          for (Object value : column) {
+          for (int rowId = 0; rowId < columnarBuilder._numRows; rowId++) {
+            value = column[rowId];
+            if (value == null) {
+              nullBitmaps[colId].add(rowId);
+              value = nullPlaceholders[colId];
+            }
             byteBuffer.putFloat(((Number) value).floatValue());
           }
           break;
         case DOUBLE:
-          for (Object value : column) {
+          for (int rowId = 0; rowId < columnarBuilder._numRows; rowId++) {
+            value = column[rowId];
+            if (value == null) {
+              nullBitmaps[colId].add(rowId);
+              value = nullPlaceholders[colId];
+            }
             byteBuffer.putDouble(((Number) value).doubleValue());
           }
           break;
         case BIG_DECIMAL:
-          for (Object value : column) {
+          for (int rowId = 0; rowId < columnarBuilder._numRows; rowId++) {
+            value = column[rowId];
+            if (value == null) {
+              nullBitmaps[colId].add(rowId);
+              value = nullPlaceholders[colId];
+            }
             setColumn(columnarBuilder, byteBuffer, (BigDecimal) value);
           }
           break;
         case STRING:
-          for (Object value : column) {
+          for (int rowId = 0; rowId < columnarBuilder._numRows; rowId++) {
+            value = column[rowId];
+            if (value == null) {
+              nullBitmaps[colId].add(rowId);
+              value = nullPlaceholders[colId];
+            }
             setColumn(columnarBuilder, byteBuffer, (String) value);
           }
           break;
         case BYTES:
-          for (Object value : column) {
+          for (int rowId = 0; rowId < columnarBuilder._numRows; rowId++) {
+            value = column[rowId];
+            if (value == null) {
+              nullBitmaps[colId].add(rowId);
+              value = nullPlaceholders[colId];
+            }
             setColumn(columnarBuilder, byteBuffer, (ByteArray) value);
           }
           break;
         case OBJECT:
-          for (Object value : column) {
+          for (int rowId = 0; rowId < columnarBuilder._numRows; rowId++) {
+            value = column[rowId];
+            if (value == null) {
+              nullBitmaps[colId].add(rowId);
+              value = nullPlaceholders[colId];
+            }
             setColumn(columnarBuilder, byteBuffer, value);
           }
           break;
         // Multi-value column
         case BOOLEAN_ARRAY:
         case INT_ARRAY:
-          for (Object value : column) {
+          for (int rowId = 0; rowId < columnarBuilder._numRows; rowId++) {
+            value = column[rowId];
+            if (value == null) {
+              nullBitmaps[colId].add(rowId);
+              value = nullPlaceholders[colId];
+            }
             setColumn(columnarBuilder, byteBuffer, (int[]) value);
           }
           break;
         case TIMESTAMP_ARRAY:
         case LONG_ARRAY:
-          for (Object value : column) {
+          for (int rowId = 0; rowId < columnarBuilder._numRows; rowId++) {
+            value = column[rowId];
+            if (value == null) {
+              nullBitmaps[colId].add(rowId);
+              value = nullPlaceholders[colId];
+            }
             if (value instanceof int[]) {
               // LONG_ARRAY type covers INT_ARRAY and LONG_ARRAY
               int[] ints = (int[]) value;
@@ -261,12 +336,22 @@ public class DataBlockBuilder {
           }
           break;
         case FLOAT_ARRAY:
-          for (Object value : column) {
+          for (int rowId = 0; rowId < columnarBuilder._numRows; rowId++) {
+            value = column[rowId];
+            if (value == null) {
+              nullBitmaps[colId].add(rowId);
+              value = nullPlaceholders[colId];
+            }
             setColumn(columnarBuilder, byteBuffer, (float[]) value);
           }
           break;
         case DOUBLE_ARRAY:
-          for (Object value : column) {
+          for (int rowId = 0; rowId < columnarBuilder._numRows; rowId++) {
+            value = column[rowId];
+            if (value == null) {
+              nullBitmaps[colId].add(rowId);
+              value = nullPlaceholders[colId];
+            }
             // DOUBLE_ARRAY type covers INT_ARRAY, LONG_ARRAY, FLOAT_ARRAY and DOUBLE_ARRAY
             if (value instanceof int[]) {
               int[] ints = (int[]) value;
@@ -293,22 +378,25 @@ public class DataBlockBuilder {
           break;
         case BYTES_ARRAY:
         case STRING_ARRAY:
-          for (Object value : column) {
+          for (int rowId = 0; rowId < columnarBuilder._numRows; rowId++) {
+            value = column[rowId];
+            if (value == null) {
+              nullBitmaps[colId].add(rowId);
+              value = nullPlaceholders[colId];
+            }
             setColumn(columnarBuilder, byteBuffer, (String[]) value);
           }
           break;
         default:
           throw new IllegalStateException(
-              String.format("Unsupported data type: %s for column: %s", columnarBuilder._columnDataType[i],
-                  columnarBuilder._dataSchema.getColumnName(i)));
+              String.format("Unsupported data type: %s for column: %s", columnarBuilder._columnDataTypes[colId],
+                  columnarBuilder._dataSchema.getColumnName(colId)));
       }
       columnarBuilder._fixedSizeDataByteArrayOutputStream.write(byteBuffer.array(), 0, byteBuffer.position());
     }
     // Write null bitmaps after writing data.
-    if (colNullBitmaps != null) {
-      for (RoaringBitmap nullBitmap : colNullBitmaps) {
-        columnarBuilder.setNullRowIds(nullBitmap);
-      }
+    for (RoaringBitmap nullBitmap : nullBitmaps) {
+      columnarBuilder.setNullRowIds(nullBitmap);
     }
     return buildColumnarBlock(columnarBuilder);
   }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockUtils.java
index d1d9185d11..88cdf8f2db 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockUtils.java
@@ -28,6 +28,7 @@ import javax.annotation.Nonnull;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.response.ProcessingException;
 import org.apache.pinot.common.utils.DataSchema;
+import org.roaringbitmap.RoaringBitmap;
 
 
 public final class DataBlockUtils {
@@ -74,7 +75,7 @@ public final class DataBlockUtils {
     }
   }
 
-  public static List<Object[]> extraRows(BaseDataBlock dataBlock) {
+  public static List<Object[]> extractRows(BaseDataBlock dataBlock) {
     DataSchema dataSchema = dataBlock.getDataSchema();
     DataSchema.ColumnDataType[] storedColumnDataTypes = dataSchema.getStoredColumnDataTypes();
     int numRows = dataBlock.getNumberOfRows();
@@ -133,6 +134,15 @@ public final class DataBlockUtils {
       }
       rows.add(row);
     }
+
+    for (int colId = 0; colId < numColumns; colId++) {
+      RoaringBitmap nullBitmap = dataBlock.getNullRowIds(colId);
+      if (nullBitmap != null) {
+        for (Integer rowId : nullBitmap) {
+          rows.get(rowId)[colId] = null;
+        }
+      }
+    }
     return rows;
   }
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/RowDataBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/RowDataBlock.java
index fdfea0f848..5bdf121fad 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/RowDataBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/RowDataBlock.java
@@ -20,10 +20,7 @@ package org.apache.pinot.core.common.datablock;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import javax.annotation.Nullable;
 import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.core.common.ObjectSerDeUtils;
-import org.roaringbitmap.RoaringBitmap;
 
 
 /**
@@ -50,31 +47,11 @@ public class RowDataBlock extends BaseDataBlock {
     computeBlockObjectConstants();
   }
 
-  @Nullable
-  @Override
-  public RoaringBitmap getNullRowIds(int colId) {
-    // _fixedSizeData stores two ints per col's null bitmap: offset, and length.
-    int position = _numRows * _rowSizeInBytes + colId * Integer.BYTES * 2;
-    if (_fixedSizeData == null || position >= _fixedSizeData.limit()) {
-      return null;
-    }
-
-    _fixedSizeData.position(position);
-    int offset = _fixedSizeData.getInt();
-    int bytesLength = _fixedSizeData.getInt();
-    if (bytesLength > 0) {
-      _variableSizeData.position(offset);
-      byte[] nullBitmapBytes = new byte[bytesLength];
-      _variableSizeData.get(nullBitmapBytes);
-      return ObjectSerDeUtils.ROARING_BITMAP_SER_DE.deserialize(nullBitmapBytes);
-    }
-    return null;
-  }
-
   protected void computeBlockObjectConstants() {
     if (_dataSchema != null) {
       _columnOffsets = new int[_numColumns];
       _rowSizeInBytes = DataBlockUtils.computeColumnOffsets(_dataSchema, _columnOffsets);
+      _fixDataSize = _numRows * _rowSizeInBytes;
     }
   }
 
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTest.java b/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTest.java
index f673d2433d..70ca2e067e 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTest.java
@@ -29,7 +29,9 @@ import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.core.common.datatable.DataTableFactory;
 import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
+import org.roaringbitmap.RoaringBitmap;
 import org.testng.Assert;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 
@@ -64,8 +66,8 @@ public class DataBlockTest {
    *
    * @throws Exception
    */
-  @Test
-  public void testRowDataBlockCompatibleWithDataTableV4()
+  @Test(dataProvider = "testTypeNullPercentile")
+  public void testRowDataBlockCompatibleWithDataTableV4(int nullPercentile)
       throws Exception {
     DataSchema.ColumnDataType[] allDataTypes = DataSchema.ColumnDataType.values();
     List<DataSchema.ColumnDataType> columnDataTypes = new ArrayList<DataSchema.ColumnDataType>();
@@ -79,9 +81,9 @@ public class DataBlockTest {
 
     DataSchema dataSchema = new DataSchema(columnNames.toArray(new String[0]),
         columnDataTypes.toArray(new DataSchema.ColumnDataType[0]));
-    List<Object[]> rows = DataBlockTestUtils.getRandomRows(dataSchema, TEST_ROW_COUNT);
+    List<Object[]> rows = DataBlockTestUtils.getRandomRows(dataSchema, TEST_ROW_COUNT, nullPercentile);
     DataTableFactory.setDataTableVersion(DataTableFactory.VERSION_4);
-    DataTable dataTableImpl = SelectionOperatorUtils.getDataTableFromRows(rows, dataSchema, false);
+    DataTable dataTableImpl = SelectionOperatorUtils.getDataTableFromRows(rows, dataSchema, true);
     DataTable dataBlockFromDataTable = DataBlockUtils.getDataBlock(ByteBuffer.wrap(dataTableImpl.toBytes()));
 
     for (int rowId = 0; rowId < TEST_ROW_COUNT; rowId++) {
@@ -93,23 +95,34 @@ public class DataBlockTest {
             + " from DataBlock: [" + rowFromBlock[rowId] + "], from DataTable: [" + rowFromDataTable[colId] + "]");
       }
     }
+
+    for (int colId = 0; colId < dataSchema.getColumnNames().length; colId++) {
+      RoaringBitmap dataBlockBitmap = dataBlockFromDataTable.getNullRowIds(colId);
+      RoaringBitmap dataTableBitmap = dataTableImpl.getNullRowIds(colId);
+      Assert.assertEquals(dataBlockBitmap, dataTableBitmap);
+    }
   }
 
-  @Test
-  public void testAllDataTypes()
+  @Test(dataProvider = "testTypeNullPercentile")
+  public void testAllDataTypes(int nullPercentile)
       throws Exception {
-    DataSchema.ColumnDataType[] columnDataTypes = DataSchema.ColumnDataType.values();
-    int numColumns = columnDataTypes.length;
-    String[] columnNames = new String[numColumns];
-    for (int i = 0; i < numColumns; i++) {
-      columnNames[i] = columnDataTypes[i].name();
+
+    DataSchema.ColumnDataType[] allDataTypes = DataSchema.ColumnDataType.values();
+    List<DataSchema.ColumnDataType> columnDataTypes = new ArrayList<DataSchema.ColumnDataType>();
+    List<String> columnNames = new ArrayList<String>();
+    for (int i = 0; i < allDataTypes.length; i++) {
+      if (!EXCLUDE_DATA_TYPES.contains(allDataTypes[i])) {
+        columnNames.add(allDataTypes[i].name());
+        columnDataTypes.add(allDataTypes[i]);
+      }
     }
 
-    DataSchema dataSchema = new DataSchema(columnNames, columnDataTypes);
-    List<Object[]> rows = DataBlockTestUtils.getRandomRows(dataSchema, TEST_ROW_COUNT);
+    DataSchema dataSchema = new DataSchema(columnNames.toArray(new String[]{}),
+        columnDataTypes.toArray(new DataSchema.ColumnDataType[]{}));
+    List<Object[]> rows = DataBlockTestUtils.getRandomRows(dataSchema, TEST_ROW_COUNT, nullPercentile);
     List<Object[]> columnars = DataBlockTestUtils.convertColumnar(dataSchema, rows);
-    RowDataBlock rowBlock = DataBlockBuilder.buildFromRows(rows, null, dataSchema);
-    ColumnarDataBlock columnarBlock = DataBlockBuilder.buildFromColumns(columnars, null, dataSchema);
+    RowDataBlock rowBlock = DataBlockBuilder.buildFromRows(rows, dataSchema);
+    ColumnarDataBlock columnarBlock = DataBlockBuilder.buildFromColumns(columnars, dataSchema);
 
     for (int colId = 0; colId < dataSchema.getColumnNames().length; colId++) {
       DataSchema.ColumnDataType columnDataType = dataSchema.getColumnDataType(colId);
@@ -121,4 +134,9 @@ public class DataBlockTest {
       }
     }
   }
+
+  @DataProvider(name = "testTypeNullPercentile")
+  public Object[][] provideTestTypeNullPercentile() {
+    return new Object[][]{new Object[]{0}, new Object[]{10}, new Object[]{100}};
+  }
 }
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTestUtils.java b/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTestUtils.java
index 54f046d13f..e56ccc422b 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTestUtils.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTestUtils.java
@@ -25,6 +25,7 @@ import java.util.Random;
 import org.apache.commons.lang.RandomStringUtils;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.spi.utils.ByteArray;
+import org.roaringbitmap.RoaringBitmap;
 
 
 public class DataBlockTestUtils {
@@ -35,7 +36,7 @@ public class DataBlockTestUtils {
     // do not instantiate.
   }
 
-  public static Object[] getRandomRow(DataSchema dataSchema) {
+  public static Object[] getRandomRow(DataSchema dataSchema, int nullPercentile) {
     final int numColumns = dataSchema.getColumnNames().length;
     DataSchema.ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
     Object[] row = new Object[numColumns];
@@ -100,7 +101,6 @@ public class DataBlockTestUtils {
           }
           row[colId] = doubleArray;
           break;
-        case BYTES_ARRAY:
         case STRING_ARRAY:
           length = RANDOM.nextInt(ARRAY_SIZE);
           String[] stringArray = new String[length];
@@ -112,12 +112,22 @@ public class DataBlockTestUtils {
         default:
           throw new UnsupportedOperationException("Can't fill random data for column type: " + columnDataTypes[colId]);
       }
+      // randomly set some entry to null
+      if (columnDataTypes[colId].getStoredType() != DataSchema.ColumnDataType.OBJECT) {
+        row[colId] = randomlySettingNull(nullPercentile) ? null : row[colId];
+      }
     }
     return row;
   }
 
   public static Object getElement(BaseDataBlock dataBlock, int rowId, int colId,
       DataSchema.ColumnDataType columnDataType) {
+    RoaringBitmap nullBitmap = dataBlock.getNullRowIds(colId);
+    if (nullBitmap != null) {
+      if (nullBitmap.contains(rowId)) {
+        return null;
+      }
+    }
     switch (columnDataType.getStoredType()) {
       case INT:
         return dataBlock.getInt(rowId, colId);
@@ -145,7 +155,6 @@ public class DataBlockTestUtils {
         return dataBlock.getFloatArray(rowId, colId);
       case DOUBLE_ARRAY:
         return dataBlock.getDoubleArray(rowId, colId);
-      case BYTES_ARRAY:
       case STRING_ARRAY:
         return dataBlock.getStringArray(rowId, colId);
       default:
@@ -153,19 +162,14 @@ public class DataBlockTestUtils {
     }
   }
 
-  public static List<Object[]> getRandomRows(DataSchema dataSchema, int numRows) {
+  public static List<Object[]> getRandomRows(DataSchema dataSchema, int numRows, int nullPercentile) {
     List<Object[]> rows = new ArrayList<>(numRows);
     for (int i = 0; i < numRows; i++) {
-      rows.add(getRandomRow(dataSchema));
+      rows.add(getRandomRow(dataSchema, nullPercentile));
     }
     return rows;
   }
 
-  public static List<Object[]> getRandomColumnar(DataSchema dataSchema, int numRows) {
-    List<Object[]> rows = getRandomRows(dataSchema, numRows);
-    return convertColumnar(dataSchema, rows);
-  }
-
   public static List<Object[]> convertColumnar(DataSchema dataSchema, List<Object[]> rows) {
     final int numRows = rows.size();
     final int numColumns = dataSchema.getColumnNames().length;
@@ -178,4 +182,8 @@ public class DataBlockTestUtils {
     }
     return columnars;
   }
+
+  public static boolean randomlySettingNull(int percentile) {
+    return RANDOM.nextInt(100) >= (100 - percentile);
+  }
 }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
index 775a5de6ef..d9b4c31db0 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
@@ -83,7 +83,7 @@ public class TransferableBlock implements Block {
     if (_container == null) {
       switch (_type) {
         case ROW:
-          _container = DataBlockUtils.extraRows(_dataBlock);
+          _container = DataBlockUtils.extractRows(_dataBlock);
           break;
         case COLUMNAR:
         default:
@@ -105,10 +105,10 @@ public class TransferableBlock implements Block {
       try {
         switch (_type) {
           case ROW:
-            _dataBlock = DataBlockBuilder.buildFromRows(_container, null, _dataSchema);
+            _dataBlock = DataBlockBuilder.buildFromRows(_container, _dataSchema);
             break;
           case COLUMNAR:
-            _dataBlock = DataBlockBuilder.buildFromColumns(_container, null, _dataSchema);
+            _dataBlock = DataBlockBuilder.buildFromColumns(_container, _dataSchema);
             break;
           case METADATA:
             throw new UnsupportedOperationException("Metadata block cannot be constructed from container");
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index 32375dc4fa..1a12a1177b 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -186,7 +186,7 @@ public class MailboxSendOperator extends BaseOperator<TransferableBlock> {
       List<BaseDataBlock> dataTableList = new ArrayList<>(partitionSize);
       for (int i = 0; i < partitionSize; i++) {
         List<Object[]> objects = temporaryRows.get(i);
-        dataTableList.add(DataBlockBuilder.buildFromRows(objects, null, dataBlock.getDataSchema()));
+        dataTableList.add(DataBlockBuilder.buildFromRows(objects, dataBlock.getDataSchema()));
       }
       return dataTableList;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org