You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by si...@apache.org on 2022/09/20 19:52:13 UTC
[pinot] branch master updated: [multistage] support NULL in data blocks (#9427)
This is an automated email from the ASF dual-hosted git repository.
siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 71b84452c9 [multistage] support NULL in data blocks (#9427)
71b84452c9 is described below
commit 71b84452c9a2d84b9e111e278479f21c81d321fc
Author: Rong Rong <ro...@apache.org>
AuthorDate: Tue Sep 20 12:52:06 2022 -0700
[multistage] support NULL in data blocks (#9427)
* initial commit for supporting null in datablock
* add test according to comment
Co-authored-by: Rong Rong <ro...@startree.ai>
---
.../pinot/core/common/datablock/BaseDataBlock.java | 21 ++-
.../core/common/datablock/ColumnarDataBlock.java | 2 +
.../core/common/datablock/DataBlockBuilder.java | 166 ++++++++++++++++-----
.../core/common/datablock/DataBlockUtils.java | 12 +-
.../pinot/core/common/datablock/RowDataBlock.java | 25 +---
.../pinot/core/common/datablock/DataBlockTest.java | 48 ++++--
.../core/common/datablock/DataBlockTestUtils.java | 28 ++--
.../query/runtime/blocks/TransferableBlock.java | 6 +-
.../runtime/operator/MailboxSendOperator.java | 2 +-
9 files changed, 216 insertions(+), 94 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/BaseDataBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/BaseDataBlock.java
index 74d42f6628..ce7da42063 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/BaseDataBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/BaseDataBlock.java
@@ -86,6 +86,7 @@ public abstract class BaseDataBlock implements DataTable {
protected int _numRows;
protected int _numColumns;
+ protected int _fixDataSize;
protected DataSchema _dataSchema;
protected String[] _stringDictionary;
protected byte[] _fixedSizeDataBytes;
@@ -107,6 +108,7 @@ public abstract class BaseDataBlock implements DataTable {
_numRows = numRows;
_dataSchema = dataSchema;
_numColumns = dataSchema == null ? 0 : dataSchema.size();
+ _fixDataSize = 0;
_stringDictionary = stringDictionary;
_fixedSizeDataBytes = fixedSizeDataBytes;
_fixedSizeData = ByteBuffer.wrap(fixedSizeDataBytes);
@@ -122,6 +124,7 @@ public abstract class BaseDataBlock implements DataTable {
public BaseDataBlock() {
_numRows = 0;
_numColumns = 0;
+ _fixDataSize = 0;
_dataSchema = null;
_stringDictionary = null;
_fixedSizeDataBytes = null;
@@ -249,7 +252,23 @@ public abstract class BaseDataBlock implements DataTable {
@Nullable
@Override
public RoaringBitmap getNullRowIds(int colId) {
- return null;
+ // _fixedSizeData stores two ints per col's null bitmap: offset, and length.
+ int position = _fixDataSize + colId * Integer.BYTES * 2;
+ if (_fixedSizeData == null || position >= _fixedSizeData.limit()) {
+ return null;
+ }
+
+ _fixedSizeData.position(position);
+ int offset = _fixedSizeData.getInt();
+ int bytesLength = _fixedSizeData.getInt();
+ if (bytesLength > 0) {
+ _variableSizeData.position(offset);
+ byte[] nullBitmapBytes = new byte[bytesLength];
+ _variableSizeData.get(nullBitmapBytes);
+ return ObjectSerDeUtils.ROARING_BITMAP_SER_DE.deserialize(nullBitmapBytes);
+ } else {
+ return null;
+ }
}
// --------------------------------------------------------------------------
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/ColumnarDataBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/ColumnarDataBlock.java
index 9a3cff57f7..cdf8ed0849 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/ColumnarDataBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/ColumnarDataBlock.java
@@ -48,6 +48,7 @@ public class ColumnarDataBlock extends BaseDataBlock {
}
protected void computeBlockObjectConstants() {
+ _fixDataSize = 0;
if (_dataSchema != null) {
_cumulativeColumnOffsetSizeInBytes = new int[_numColumns];
_columnSizeInBytes = new int[_numColumns];
@@ -57,6 +58,7 @@ public class ColumnarDataBlock extends BaseDataBlock {
_cumulativeColumnOffsetSizeInBytes[i] = cumulativeColumnOffset;
cumulativeColumnOffset += _columnSizeInBytes[i] * _numRows;
}
+ _fixDataSize = cumulativeColumnOffset;
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
index 8a47f261ff..e4f9a76a85 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
@@ -38,7 +38,7 @@ import org.roaringbitmap.RoaringBitmap;
public class DataBlockBuilder {
private final DataSchema _dataSchema;
private final BaseDataBlock.Type _blockType;
- private final DataSchema.ColumnDataType[] _columnDataType;
+ private final DataSchema.ColumnDataType[] _columnDataTypes;
private int[] _columnOffsets;
private int _rowSizeInBytes;
@@ -57,7 +57,7 @@ public class DataBlockBuilder {
private DataBlockBuilder(DataSchema dataSchema, BaseDataBlock.Type blockType) {
_dataSchema = dataSchema;
- _columnDataType = dataSchema.getStoredColumnDataTypes();
+ _columnDataTypes = dataSchema.getStoredColumnDataTypes();
_blockType = blockType;
_numColumns = dataSchema.size();
if (_blockType == BaseDataBlock.Type.COLUMNAR) {
@@ -87,16 +87,30 @@ public class DataBlockBuilder {
}
}
- public static RowDataBlock buildFromRows(List<Object[]> rows, @Nullable RoaringBitmap[] colNullBitmaps,
- DataSchema dataSchema)
+ public static RowDataBlock buildFromRows(List<Object[]> rows, DataSchema dataSchema)
throws IOException {
DataBlockBuilder rowBuilder = new DataBlockBuilder(dataSchema, BaseDataBlock.Type.ROW);
+ // TODO: consolidate these null utils into data table utils.
+ // Selection / Agg / Distinct all have similar code.
+ int numColumns = rowBuilder._numColumns;
+ RoaringBitmap[] nullBitmaps = new RoaringBitmap[numColumns];
+ DataSchema.ColumnDataType[] storedColumnDataTypes = dataSchema.getStoredColumnDataTypes();
+ Object[] nullPlaceholders = new Object[numColumns];
+ for (int colId = 0; colId < numColumns; colId++) {
+ nullBitmaps[colId] = new RoaringBitmap();
+ nullPlaceholders[colId] = storedColumnDataTypes[colId].getNullPlaceholder();
+ }
rowBuilder._numRows = rows.size();
- for (Object[] row : rows) {
+ for (int rowId = 0; rowId < rows.size(); rowId++) {
+ Object[] row = rows.get(rowId);
ByteBuffer byteBuffer = ByteBuffer.allocate(rowBuilder._rowSizeInBytes);
- for (int i = 0; i < rowBuilder._numColumns; i++) {
- Object value = row[i];
- switch (rowBuilder._columnDataType[i]) {
+ for (int colId = 0; colId < rowBuilder._numColumns; colId++) {
+ Object value = row[colId];
+ if (value == null) {
+ nullBitmaps[colId].add(rowId);
+ value = nullPlaceholders[colId];
+ }
+ switch (rowBuilder._columnDataTypes[colId]) {
// Single-value column
case INT:
byteBuffer.putInt(((Number) value).intValue());
@@ -168,86 +182,147 @@ public class DataBlockBuilder {
}
break;
case BYTES_ARRAY:
+ setColumn(rowBuilder, byteBuffer, (byte[][]) value);
+ break;
case STRING_ARRAY:
setColumn(rowBuilder, byteBuffer, (String[]) value);
break;
default:
throw new IllegalStateException(
- String.format("Unsupported data type: %s for column: %s", rowBuilder._columnDataType[i],
- rowBuilder._dataSchema.getColumnName(i)));
+ String.format("Unsupported data type: %s for column: %s", rowBuilder._columnDataTypes[colId],
+ rowBuilder._dataSchema.getColumnName(colId)));
}
}
rowBuilder._fixedSizeDataByteArrayOutputStream.write(byteBuffer.array(), 0, byteBuffer.position());
}
// Write null bitmaps after writing data.
- if (colNullBitmaps != null) {
- for (RoaringBitmap nullBitmap : colNullBitmaps) {
- rowBuilder.setNullRowIds(nullBitmap);
- }
+ for (RoaringBitmap nullBitmap : nullBitmaps) {
+ rowBuilder.setNullRowIds(nullBitmap);
}
return buildRowBlock(rowBuilder);
}
- public static ColumnarDataBlock buildFromColumns(List<Object[]> columns, @Nullable RoaringBitmap[] colNullBitmaps,
- DataSchema dataSchema)
+ public static ColumnarDataBlock buildFromColumns(List<Object[]> columns, DataSchema dataSchema)
throws IOException {
DataBlockBuilder columnarBuilder = new DataBlockBuilder(dataSchema, BaseDataBlock.Type.COLUMNAR);
- for (int i = 0; i < columns.size(); i++) {
- Object[] column = columns.get(i);
+
+ // TODO: consolidate these null utils into data table utils.
+ // Selection / Agg / Distinct all have similar code.
+ int numColumns = columnarBuilder._numColumns;
+ RoaringBitmap[] nullBitmaps = new RoaringBitmap[numColumns];
+ DataSchema.ColumnDataType[] storedColumnDataTypes = dataSchema.getStoredColumnDataTypes();
+ Object[] nullPlaceholders = new Object[numColumns];
+ for (int colId = 0; colId < numColumns; colId++) {
+ nullBitmaps[colId] = new RoaringBitmap();
+ nullPlaceholders[colId] = storedColumnDataTypes[colId].getNullPlaceholder();
+ }
+ for (int colId = 0; colId < columns.size(); colId++) {
+ Object[] column = columns.get(colId);
columnarBuilder._numRows = column.length;
- ByteBuffer byteBuffer = ByteBuffer.allocate(columnarBuilder._numRows * columnarBuilder._columnSizeInBytes[i]);
- switch (columnarBuilder._columnDataType[i]) {
+ ByteBuffer byteBuffer = ByteBuffer.allocate(columnarBuilder._numRows * columnarBuilder._columnSizeInBytes[colId]);
+ Object value;
+ switch (columnarBuilder._columnDataTypes[colId]) {
// Single-value column
case INT:
- for (Object value : column) {
+ for (int rowId = 0; rowId < columnarBuilder._numRows; rowId++) {
+ value = column[rowId];
+ if (value == null) {
+ nullBitmaps[colId].add(rowId);
+ value = nullPlaceholders[colId];
+ }
byteBuffer.putInt(((Number) value).intValue());
}
break;
case LONG:
- for (Object value : column) {
+ for (int rowId = 0; rowId < columnarBuilder._numRows; rowId++) {
+ value = column[rowId];
+ if (value == null) {
+ nullBitmaps[colId].add(rowId);
+ value = nullPlaceholders[colId];
+ }
byteBuffer.putLong(((Number) value).longValue());
}
break;
case FLOAT:
- for (Object value : column) {
+ for (int rowId = 0; rowId < columnarBuilder._numRows; rowId++) {
+ value = column[rowId];
+ if (value == null) {
+ nullBitmaps[colId].add(rowId);
+ value = nullPlaceholders[colId];
+ }
byteBuffer.putFloat(((Number) value).floatValue());
}
break;
case DOUBLE:
- for (Object value : column) {
+ for (int rowId = 0; rowId < columnarBuilder._numRows; rowId++) {
+ value = column[rowId];
+ if (value == null) {
+ nullBitmaps[colId].add(rowId);
+ value = nullPlaceholders[colId];
+ }
byteBuffer.putDouble(((Number) value).doubleValue());
}
break;
case BIG_DECIMAL:
- for (Object value : column) {
+ for (int rowId = 0; rowId < columnarBuilder._numRows; rowId++) {
+ value = column[rowId];
+ if (value == null) {
+ nullBitmaps[colId].add(rowId);
+ value = nullPlaceholders[colId];
+ }
setColumn(columnarBuilder, byteBuffer, (BigDecimal) value);
}
break;
case STRING:
- for (Object value : column) {
+ for (int rowId = 0; rowId < columnarBuilder._numRows; rowId++) {
+ value = column[rowId];
+ if (value == null) {
+ nullBitmaps[colId].add(rowId);
+ value = nullPlaceholders[colId];
+ }
setColumn(columnarBuilder, byteBuffer, (String) value);
}
break;
case BYTES:
- for (Object value : column) {
+ for (int rowId = 0; rowId < columnarBuilder._numRows; rowId++) {
+ value = column[rowId];
+ if (value == null) {
+ nullBitmaps[colId].add(rowId);
+ value = nullPlaceholders[colId];
+ }
setColumn(columnarBuilder, byteBuffer, (ByteArray) value);
}
break;
case OBJECT:
- for (Object value : column) {
+ for (int rowId = 0; rowId < columnarBuilder._numRows; rowId++) {
+ value = column[rowId];
+ if (value == null) {
+ nullBitmaps[colId].add(rowId);
+ value = nullPlaceholders[colId];
+ }
setColumn(columnarBuilder, byteBuffer, value);
}
break;
// Multi-value column
case BOOLEAN_ARRAY:
case INT_ARRAY:
- for (Object value : column) {
+ for (int rowId = 0; rowId < columnarBuilder._numRows; rowId++) {
+ value = column[rowId];
+ if (value == null) {
+ nullBitmaps[colId].add(rowId);
+ value = nullPlaceholders[colId];
+ }
setColumn(columnarBuilder, byteBuffer, (int[]) value);
}
break;
case TIMESTAMP_ARRAY:
case LONG_ARRAY:
- for (Object value : column) {
+ for (int rowId = 0; rowId < columnarBuilder._numRows; rowId++) {
+ value = column[rowId];
+ if (value == null) {
+ nullBitmaps[colId].add(rowId);
+ value = nullPlaceholders[colId];
+ }
if (value instanceof int[]) {
// LONG_ARRAY type covers INT_ARRAY and LONG_ARRAY
int[] ints = (int[]) value;
@@ -261,12 +336,22 @@ public class DataBlockBuilder {
}
break;
case FLOAT_ARRAY:
- for (Object value : column) {
+ for (int rowId = 0; rowId < columnarBuilder._numRows; rowId++) {
+ value = column[rowId];
+ if (value == null) {
+ nullBitmaps[colId].add(rowId);
+ value = nullPlaceholders[colId];
+ }
setColumn(columnarBuilder, byteBuffer, (float[]) value);
}
break;
case DOUBLE_ARRAY:
- for (Object value : column) {
+ for (int rowId = 0; rowId < columnarBuilder._numRows; rowId++) {
+ value = column[rowId];
+ if (value == null) {
+ nullBitmaps[colId].add(rowId);
+ value = nullPlaceholders[colId];
+ }
// DOUBLE_ARRAY type covers INT_ARRAY, LONG_ARRAY, FLOAT_ARRAY and DOUBLE_ARRAY
if (value instanceof int[]) {
int[] ints = (int[]) value;
@@ -293,22 +378,25 @@ public class DataBlockBuilder {
break;
case BYTES_ARRAY:
case STRING_ARRAY:
- for (Object value : column) {
+ for (int rowId = 0; rowId < columnarBuilder._numRows; rowId++) {
+ value = column[rowId];
+ if (value == null) {
+ nullBitmaps[colId].add(rowId);
+ value = nullPlaceholders[colId];
+ }
setColumn(columnarBuilder, byteBuffer, (String[]) value);
}
break;
default:
throw new IllegalStateException(
- String.format("Unsupported data type: %s for column: %s", columnarBuilder._columnDataType[i],
- columnarBuilder._dataSchema.getColumnName(i)));
+ String.format("Unsupported data type: %s for column: %s", columnarBuilder._columnDataTypes[colId],
+ columnarBuilder._dataSchema.getColumnName(colId)));
}
columnarBuilder._fixedSizeDataByteArrayOutputStream.write(byteBuffer.array(), 0, byteBuffer.position());
}
// Write null bitmaps after writing data.
- if (colNullBitmaps != null) {
- for (RoaringBitmap nullBitmap : colNullBitmaps) {
- columnarBuilder.setNullRowIds(nullBitmap);
- }
+ for (RoaringBitmap nullBitmap : nullBitmaps) {
+ columnarBuilder.setNullRowIds(nullBitmap);
}
return buildColumnarBlock(columnarBuilder);
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockUtils.java
index d1d9185d11..88cdf8f2db 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockUtils.java
@@ -28,6 +28,7 @@ import javax.annotation.Nonnull;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.response.ProcessingException;
import org.apache.pinot.common.utils.DataSchema;
+import org.roaringbitmap.RoaringBitmap;
public final class DataBlockUtils {
@@ -74,7 +75,7 @@ public final class DataBlockUtils {
}
}
- public static List<Object[]> extraRows(BaseDataBlock dataBlock) {
+ public static List<Object[]> extractRows(BaseDataBlock dataBlock) {
DataSchema dataSchema = dataBlock.getDataSchema();
DataSchema.ColumnDataType[] storedColumnDataTypes = dataSchema.getStoredColumnDataTypes();
int numRows = dataBlock.getNumberOfRows();
@@ -133,6 +134,15 @@ public final class DataBlockUtils {
}
rows.add(row);
}
+
+ for (int colId = 0; colId < numColumns; colId++) {
+ RoaringBitmap nullBitmap = dataBlock.getNullRowIds(colId);
+ if (nullBitmap != null) {
+ for (Integer rowId : nullBitmap) {
+ rows.get(rowId)[colId] = null;
+ }
+ }
+ }
return rows;
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/RowDataBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/RowDataBlock.java
index fdfea0f848..5bdf121fad 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/RowDataBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/RowDataBlock.java
@@ -20,10 +20,7 @@ package org.apache.pinot.core.common.datablock;
import java.io.IOException;
import java.nio.ByteBuffer;
-import javax.annotation.Nullable;
import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.core.common.ObjectSerDeUtils;
-import org.roaringbitmap.RoaringBitmap;
/**
@@ -50,31 +47,11 @@ public class RowDataBlock extends BaseDataBlock {
computeBlockObjectConstants();
}
- @Nullable
- @Override
- public RoaringBitmap getNullRowIds(int colId) {
- // _fixedSizeData stores two ints per col's null bitmap: offset, and length.
- int position = _numRows * _rowSizeInBytes + colId * Integer.BYTES * 2;
- if (_fixedSizeData == null || position >= _fixedSizeData.limit()) {
- return null;
- }
-
- _fixedSizeData.position(position);
- int offset = _fixedSizeData.getInt();
- int bytesLength = _fixedSizeData.getInt();
- if (bytesLength > 0) {
- _variableSizeData.position(offset);
- byte[] nullBitmapBytes = new byte[bytesLength];
- _variableSizeData.get(nullBitmapBytes);
- return ObjectSerDeUtils.ROARING_BITMAP_SER_DE.deserialize(nullBitmapBytes);
- }
- return null;
- }
-
protected void computeBlockObjectConstants() {
if (_dataSchema != null) {
_columnOffsets = new int[_numColumns];
_rowSizeInBytes = DataBlockUtils.computeColumnOffsets(_dataSchema, _columnOffsets);
+ _fixDataSize = _numRows * _rowSizeInBytes;
}
}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTest.java b/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTest.java
index f673d2433d..70ca2e067e 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTest.java
@@ -29,7 +29,9 @@ import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.core.common.datatable.DataTableFactory;
import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
+import org.roaringbitmap.RoaringBitmap;
import org.testng.Assert;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@@ -64,8 +66,8 @@ public class DataBlockTest {
*
* @throws Exception
*/
- @Test
- public void testRowDataBlockCompatibleWithDataTableV4()
+ @Test(dataProvider = "testTypeNullPercentile")
+ public void testRowDataBlockCompatibleWithDataTableV4(int nullPercentile)
throws Exception {
DataSchema.ColumnDataType[] allDataTypes = DataSchema.ColumnDataType.values();
List<DataSchema.ColumnDataType> columnDataTypes = new ArrayList<DataSchema.ColumnDataType>();
@@ -79,9 +81,9 @@ public class DataBlockTest {
DataSchema dataSchema = new DataSchema(columnNames.toArray(new String[0]),
columnDataTypes.toArray(new DataSchema.ColumnDataType[0]));
- List<Object[]> rows = DataBlockTestUtils.getRandomRows(dataSchema, TEST_ROW_COUNT);
+ List<Object[]> rows = DataBlockTestUtils.getRandomRows(dataSchema, TEST_ROW_COUNT, nullPercentile);
DataTableFactory.setDataTableVersion(DataTableFactory.VERSION_4);
- DataTable dataTableImpl = SelectionOperatorUtils.getDataTableFromRows(rows, dataSchema, false);
+ DataTable dataTableImpl = SelectionOperatorUtils.getDataTableFromRows(rows, dataSchema, true);
DataTable dataBlockFromDataTable = DataBlockUtils.getDataBlock(ByteBuffer.wrap(dataTableImpl.toBytes()));
for (int rowId = 0; rowId < TEST_ROW_COUNT; rowId++) {
@@ -93,23 +95,34 @@ public class DataBlockTest {
+ " from DataBlock: [" + rowFromBlock[rowId] + "], from DataTable: [" + rowFromDataTable[colId] + "]");
}
}
+
+ for (int colId = 0; colId < dataSchema.getColumnNames().length; colId++) {
+ RoaringBitmap dataBlockBitmap = dataBlockFromDataTable.getNullRowIds(colId);
+ RoaringBitmap dataTableBitmap = dataTableImpl.getNullRowIds(colId);
+ Assert.assertEquals(dataBlockBitmap, dataTableBitmap);
+ }
}
- @Test
- public void testAllDataTypes()
+ @Test(dataProvider = "testTypeNullPercentile")
+ public void testAllDataTypes(int nullPercentile)
throws Exception {
- DataSchema.ColumnDataType[] columnDataTypes = DataSchema.ColumnDataType.values();
- int numColumns = columnDataTypes.length;
- String[] columnNames = new String[numColumns];
- for (int i = 0; i < numColumns; i++) {
- columnNames[i] = columnDataTypes[i].name();
+
+ DataSchema.ColumnDataType[] allDataTypes = DataSchema.ColumnDataType.values();
+ List<DataSchema.ColumnDataType> columnDataTypes = new ArrayList<DataSchema.ColumnDataType>();
+ List<String> columnNames = new ArrayList<String>();
+ for (int i = 0; i < allDataTypes.length; i++) {
+ if (!EXCLUDE_DATA_TYPES.contains(allDataTypes[i])) {
+ columnNames.add(allDataTypes[i].name());
+ columnDataTypes.add(allDataTypes[i]);
+ }
}
- DataSchema dataSchema = new DataSchema(columnNames, columnDataTypes);
- List<Object[]> rows = DataBlockTestUtils.getRandomRows(dataSchema, TEST_ROW_COUNT);
+ DataSchema dataSchema = new DataSchema(columnNames.toArray(new String[]{}),
+ columnDataTypes.toArray(new DataSchema.ColumnDataType[]{}));
+ List<Object[]> rows = DataBlockTestUtils.getRandomRows(dataSchema, TEST_ROW_COUNT, nullPercentile);
List<Object[]> columnars = DataBlockTestUtils.convertColumnar(dataSchema, rows);
- RowDataBlock rowBlock = DataBlockBuilder.buildFromRows(rows, null, dataSchema);
- ColumnarDataBlock columnarBlock = DataBlockBuilder.buildFromColumns(columnars, null, dataSchema);
+ RowDataBlock rowBlock = DataBlockBuilder.buildFromRows(rows, dataSchema);
+ ColumnarDataBlock columnarBlock = DataBlockBuilder.buildFromColumns(columnars, dataSchema);
for (int colId = 0; colId < dataSchema.getColumnNames().length; colId++) {
DataSchema.ColumnDataType columnDataType = dataSchema.getColumnDataType(colId);
@@ -121,4 +134,9 @@ public class DataBlockTest {
}
}
}
+
+ @DataProvider(name = "testTypeNullPercentile")
+ public Object[][] provideTestTypeNullPercentile() {
+ return new Object[][]{new Object[]{0}, new Object[]{10}, new Object[]{100}};
+ }
}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTestUtils.java b/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTestUtils.java
index 54f046d13f..e56ccc422b 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTestUtils.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTestUtils.java
@@ -25,6 +25,7 @@ import java.util.Random;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.spi.utils.ByteArray;
+import org.roaringbitmap.RoaringBitmap;
public class DataBlockTestUtils {
@@ -35,7 +36,7 @@ public class DataBlockTestUtils {
// do not instantiate.
}
- public static Object[] getRandomRow(DataSchema dataSchema) {
+ public static Object[] getRandomRow(DataSchema dataSchema, int nullPercentile) {
final int numColumns = dataSchema.getColumnNames().length;
DataSchema.ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
Object[] row = new Object[numColumns];
@@ -100,7 +101,6 @@ public class DataBlockTestUtils {
}
row[colId] = doubleArray;
break;
- case BYTES_ARRAY:
case STRING_ARRAY:
length = RANDOM.nextInt(ARRAY_SIZE);
String[] stringArray = new String[length];
@@ -112,12 +112,22 @@ public class DataBlockTestUtils {
default:
throw new UnsupportedOperationException("Can't fill random data for column type: " + columnDataTypes[colId]);
}
+ // randomly set some entry to null
+ if (columnDataTypes[colId].getStoredType() != DataSchema.ColumnDataType.OBJECT) {
+ row[colId] = randomlySettingNull(nullPercentile) ? null : row[colId];
+ }
}
return row;
}
public static Object getElement(BaseDataBlock dataBlock, int rowId, int colId,
DataSchema.ColumnDataType columnDataType) {
+ RoaringBitmap nullBitmap = dataBlock.getNullRowIds(colId);
+ if (nullBitmap != null) {
+ if (nullBitmap.contains(rowId)) {
+ return null;
+ }
+ }
switch (columnDataType.getStoredType()) {
case INT:
return dataBlock.getInt(rowId, colId);
@@ -145,7 +155,6 @@ public class DataBlockTestUtils {
return dataBlock.getFloatArray(rowId, colId);
case DOUBLE_ARRAY:
return dataBlock.getDoubleArray(rowId, colId);
- case BYTES_ARRAY:
case STRING_ARRAY:
return dataBlock.getStringArray(rowId, colId);
default:
@@ -153,19 +162,14 @@ public class DataBlockTestUtils {
}
}
- public static List<Object[]> getRandomRows(DataSchema dataSchema, int numRows) {
+ public static List<Object[]> getRandomRows(DataSchema dataSchema, int numRows, int nullPercentile) {
List<Object[]> rows = new ArrayList<>(numRows);
for (int i = 0; i < numRows; i++) {
- rows.add(getRandomRow(dataSchema));
+ rows.add(getRandomRow(dataSchema, nullPercentile));
}
return rows;
}
- public static List<Object[]> getRandomColumnar(DataSchema dataSchema, int numRows) {
- List<Object[]> rows = getRandomRows(dataSchema, numRows);
- return convertColumnar(dataSchema, rows);
- }
-
public static List<Object[]> convertColumnar(DataSchema dataSchema, List<Object[]> rows) {
final int numRows = rows.size();
final int numColumns = dataSchema.getColumnNames().length;
@@ -178,4 +182,8 @@ public class DataBlockTestUtils {
}
return columnars;
}
+
+ public static boolean randomlySettingNull(int percentile) {
+ return RANDOM.nextInt(100) >= (100 - percentile);
+ }
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
index 775a5de6ef..d9b4c31db0 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
@@ -83,7 +83,7 @@ public class TransferableBlock implements Block {
if (_container == null) {
switch (_type) {
case ROW:
- _container = DataBlockUtils.extraRows(_dataBlock);
+ _container = DataBlockUtils.extractRows(_dataBlock);
break;
case COLUMNAR:
default:
@@ -105,10 +105,10 @@ public class TransferableBlock implements Block {
try {
switch (_type) {
case ROW:
- _dataBlock = DataBlockBuilder.buildFromRows(_container, null, _dataSchema);
+ _dataBlock = DataBlockBuilder.buildFromRows(_container, _dataSchema);
break;
case COLUMNAR:
- _dataBlock = DataBlockBuilder.buildFromColumns(_container, null, _dataSchema);
+ _dataBlock = DataBlockBuilder.buildFromColumns(_container, _dataSchema);
break;
case METADATA:
throw new UnsupportedOperationException("Metadata block cannot be constructed from container");
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index 32375dc4fa..1a12a1177b 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -186,7 +186,7 @@ public class MailboxSendOperator extends BaseOperator<TransferableBlock> {
List<BaseDataBlock> dataTableList = new ArrayList<>(partitionSize);
for (int i = 0; i < partitionSize; i++) {
List<Object[]> objects = temporaryRows.get(i);
- dataTableList.add(DataBlockBuilder.buildFromRows(objects, null, dataBlock.getDataSchema()));
+ dataTableList.add(DataBlockBuilder.buildFromRows(objects, dataBlock.getDataSchema()));
}
return dataTableList;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org