You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2022/11/10 19:41:31 UTC
[pinot] branch master updated: [multistage] leaf-node return data in non-serialized format (#9755)
This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 1de904f545 [multistage] leaf-node return data in non-serialized format (#9755)
1de904f545 is described below
commit 1de904f54576540cabd425f02134e4b10597ddf5
Author: Rong Rong <ro...@apache.org>
AuthorDate: Thu Nov 10 11:41:24 2022 -0800
[multistage] leaf-node return data in non-serialized format (#9755)
This PR directly uses the InstanceResponseBlock to get the non-serialized rows, this
- avoid the useless Ser/De
- splitting data into chunks more directly via the List API instead of via the block API
Co-authored-by: Rong Rong <ro...@startree.ai>
---
.../core/common/datablock/DataBlockBuilder.java | 5 +-
.../pinot/core/common/datablock/DataBlockTest.java | 101 ---------------------
.../core/common/datablock/DataBlockTestUtils.java | 8 +-
.../apache/pinot/query/runtime/QueryRunner.java | 71 +++++++++------
.../query/runtime/blocks/TransferableBlock.java | 7 ++
.../runtime/blocks/TransferableBlockUtils.java | 80 ++++++++++++----
.../runtime/operator/MailboxSendOperator.java | 15 +--
.../query/runtime/plan/PhysicalPlanVisitor.java | 2 +-
.../query/runtime/TransferableBlockUtilsTest.java | 9 +-
.../testutils/MockDataBlockOperatorFactory.java | 4 +-
10 files changed, 135 insertions(+), 167 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
index 4524a733e8..cf84568fcb 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
@@ -28,7 +28,6 @@ import java.nio.ByteBuffer;
import java.sql.Timestamp;
import java.util.List;
import javax.annotation.Nullable;
-import org.apache.pinot.common.datablock.BaseDataBlock;
import org.apache.pinot.common.datablock.ColumnarDataBlock;
import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.datablock.DataBlockUtils;
@@ -45,7 +44,7 @@ import org.roaringbitmap.RoaringBitmap;
public class DataBlockBuilder {
private final DataSchema _dataSchema;
- private final BaseDataBlock.Type _blockType;
+ private final DataBlock.Type _blockType;
private final DataSchema.ColumnDataType[] _columnDataTypes;
private int[] _columnOffsets;
@@ -63,7 +62,7 @@ public class DataBlockBuilder {
private final DataOutputStream _variableSizeDataOutputStream =
new DataOutputStream(_variableSizeDataByteArrayOutputStream);
- private DataBlockBuilder(DataSchema dataSchema, BaseDataBlock.Type blockType) {
+ private DataBlockBuilder(DataSchema dataSchema, DataBlock.Type blockType) {
_dataSchema = dataSchema;
_columnDataTypes = dataSchema.getColumnDataTypes();
_blockType = blockType;
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTest.java b/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTest.java
index 72f6cd93ca..64425c84c1 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTest.java
@@ -20,23 +20,15 @@ package org.apache.pinot.core.common.datablock;
import com.google.common.collect.ImmutableList;
import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;
import org.apache.pinot.common.datablock.ColumnarDataBlock;
import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.datablock.DataBlockUtils;
import org.apache.pinot.common.datablock.RowDataBlock;
-import org.apache.pinot.common.datatable.DataTable;
-import org.apache.pinot.common.datatable.DataTableFactory;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.response.ProcessingException;
import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
-import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
-import org.apache.pinot.spi.utils.ArrayCopyUtils;
-import org.roaringbitmap.RoaringBitmap;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@@ -67,99 +59,6 @@ public class DataBlockTest {
originalException.getMessage());
}
- /**
- * This test is only here to ensure that {@link org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl}
- * producing data table can actually be wrapped and sent via mailbox in the {@link RowDataBlock} format.
- *
- * @throws Exception
- */
- @Test(dataProvider = "testTypeNullPercentile")
- public void testRowDataBlockCompatibleWithDataTableV4(int nullPercentile)
- throws Exception {
- DataSchema.ColumnDataType[] allDataTypes = DataSchema.ColumnDataType.values();
- List<DataSchema.ColumnDataType> columnDataTypes = new ArrayList<DataSchema.ColumnDataType>();
- List<String> columnNames = new ArrayList<String>();
- for (int i = 0; i < allDataTypes.length; i++) {
- if (!EXCLUDE_DATA_TYPES.contains(allDataTypes[i])) {
- columnNames.add(allDataTypes[i].name());
- columnDataTypes.add(allDataTypes[i]);
- }
- }
-
- DataSchema dataSchema = new DataSchema(columnNames.toArray(new String[0]),
- columnDataTypes.toArray(new DataSchema.ColumnDataType[0]));
- int numColumns = dataSchema.getColumnDataTypes().length;
- List<Object[]> rows = DataBlockTestUtils.getRandomRows(dataSchema, TEST_ROW_COUNT, nullPercentile);
- DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_4);
- convertToDataTableCompatibleRows(rows, dataSchema);
- DataTable dataTableImpl = SelectionOperatorUtils.getDataTableFromRows(rows, dataSchema, true);
- DataBlock dataBlockFromDataTable = DataBlockUtils.getDataBlock(ByteBuffer.wrap(dataTableImpl.toBytes()));
-
- RoaringBitmap[] nullBitmaps = new RoaringBitmap[numColumns];
- for (int coldId = 0; coldId < numColumns; coldId++) {
- nullBitmaps[coldId] = dataTableImpl.getNullRowIds(coldId);
- }
-
- List<Object[]> rowsFromBlock = DataBlockUtils.extractRows(dataBlockFromDataTable);
- for (int rowId = 0; rowId < TEST_ROW_COUNT; rowId++) {
- Object[] rowFromDataTable = SelectionOperatorUtils.extractRowFromDataTableWithNullHandling(dataTableImpl, rowId,
- nullBitmaps);
- Object[] rowFromBlock = rowsFromBlock.get(rowId);
- for (int colId = 0; colId < numColumns; colId++) {
- Object dataTableObj = rowFromDataTable[colId] == null ? null
- : dataSchema.getColumnDataType(colId).convert(rowFromDataTable[colId]);
- Object dataBlockObj = rowFromBlock[colId];
- Assert.assertEquals(dataBlockObj, dataTableObj, "Error comparing Row/Column Block "
- + " at (" + rowId + "," + colId + ") of Type: " + dataSchema.getColumnDataType(colId) + "! "
- + " from DataBlock: [" + dataBlockObj + "], from DataTable: [" + dataTableObj + "]");
- }
- }
-
- for (int colId = 0; colId < dataSchema.getColumnNames().length; colId++) {
- RoaringBitmap dataBlockBitmap = dataBlockFromDataTable.getNullRowIds(colId);
- RoaringBitmap dataTableBitmap = dataTableImpl.getNullRowIds(colId);
- Assert.assertEquals(dataBlockBitmap, dataTableBitmap);
- }
- }
-
- private static void convertToDataTableCompatibleRows(List<Object[]> rows, DataSchema dataSchema) {
- int numColumns = dataSchema.getColumnNames().length;
- for (int rowId = 0; rowId < rows.size(); rowId++) {
- for (int colId = 0; colId < numColumns; colId++) {
- switch (dataSchema.getColumnDataType(colId)) {
- case BOOLEAN:
- if (rows.get(rowId)[colId] != null) {
- rows.get(rowId)[colId] = ((boolean) rows.get(rowId)[colId]) ? 1 : 0;
- }
- break;
- case TIMESTAMP:
- if (rows.get(rowId)[colId] != null) {
- rows.get(rowId)[colId] = ((Timestamp) rows.get(rowId)[colId]).getTime();
- }
- break;
- case BOOLEAN_ARRAY:
- if (rows.get(rowId)[colId] != null) {
- boolean[] booleans = (boolean[]) rows.get(rowId)[colId];
- int[] ints = new int[booleans.length];
- ArrayCopyUtils.copy(booleans, ints, booleans.length);
- rows.get(rowId)[colId] = ints;
- }
- break;
- case TIMESTAMP_ARRAY:
- if (rows.get(rowId)[colId] != null) {
- Timestamp[] timestamps = (Timestamp[]) rows.get(rowId)[colId];
- long[] longs = new long[timestamps.length];
- ArrayCopyUtils.copy(timestamps, longs, timestamps.length);
- rows.get(rowId)[colId] = longs;
- }
- break;
- default:
- break;
- }
- }
- }
- }
-
@Test(dataProvider = "testTypeNullPercentile")
public void testAllDataTypes(int nullPercentile)
throws Exception {
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTestUtils.java b/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTestUtils.java
index 427f727f55..67cc82cba6 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTestUtils.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTestUtils.java
@@ -24,7 +24,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import org.apache.commons.lang.RandomStringUtils;
-import org.apache.pinot.common.datablock.BaseDataBlock;
+import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.spi.utils.ByteArray;
import org.roaringbitmap.RoaringBitmap;
@@ -60,7 +60,7 @@ public class DataBlockTestUtils {
row[colId] = BigDecimal.valueOf(RANDOM.nextDouble());
break;
case BOOLEAN:
- row[colId] = RANDOM.nextInt(2) % 2 == 1;
+ row[colId] = RANDOM.nextBoolean();
break;
case TIMESTAMP:
row[colId] = new Timestamp(RANDOM.nextLong());
@@ -115,7 +115,7 @@ public class DataBlockTestUtils {
length = RANDOM.nextInt(ARRAY_SIZE);
boolean[] booleanArray = new boolean[length];
for (int i = 0; i < length; i++) {
- booleanArray[i] = RANDOM.nextInt(2) % 2 == 1;
+ booleanArray[i] = RANDOM.nextBoolean();
}
row[colId] = booleanArray;
break;
@@ -138,7 +138,7 @@ public class DataBlockTestUtils {
return row;
}
- public static Object getElement(BaseDataBlock dataBlock, int rowId, int colId,
+ public static Object getElement(DataBlock dataBlock, int rowId, int colId,
DataSchema.ColumnDataType columnDataType) {
RoaringBitmap nullBitmap = dataBlock.getNullRowIds(colId);
if (nullBitmap != null) {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index bf3872e2cb..d0e8a86489 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -19,10 +19,12 @@
package org.apache.pinot.query.runtime;
import com.google.common.base.Preconditions;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.PriorityQueue;
import java.util.concurrent.ExecutorService;
import javax.annotation.Nullable;
import org.apache.helix.HelixManager;
@@ -32,6 +34,7 @@ import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.datablock.DataBlockUtils;
import org.apache.pinot.common.datablock.MetadataBlock;
import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.utils.DataSchema;
@@ -39,6 +42,7 @@ import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.operator.BaseOperator;
import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
+import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
import org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl;
import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.core.transport.ServerInstance;
@@ -124,7 +128,7 @@ public class QueryRunner {
requestMetadataMap, _helixPropertyStore, _mailboxService);
// send the data table via mailbox in one-off fashion (e.g. no block-level split, one data table/partition key)
- List<DataBlock> serverQueryResults = new ArrayList<>(serverQueryRequests.size());
+ List<InstanceResponseBlock> serverQueryResults = new ArrayList<>(serverQueryRequests.size());
for (ServerPlanRequestContext requestContext : serverQueryRequests) {
ServerQueryRequest request = new ServerQueryRequest(requestContext.getInstanceRequest(),
new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), System.currentTimeMillis());
@@ -138,7 +142,7 @@ public class QueryRunner {
new LeafStageTransferableBlockOperator(serverQueryResults, sendNode.getDataSchema()),
receivingStageMetadata.getServerInstances(), sendNode.getExchangeType(),
sendNode.getPartitionKeySelector(), _hostname, _port, serverQueryRequests.get(0).getRequestId(),
- sendNode.getStageId());
+ sendNode.getStageId(), true);
int blockCounter = 0;
while (!TransferableBlockUtils.isEndOfStream(mailboxSendOperator.nextBlock())) {
LOGGER.debug("Acquired transferable block: {}", blockCounter++);
@@ -184,22 +188,15 @@ public class QueryRunner {
return requests;
}
- private DataBlock processServerQuery(ServerQueryRequest serverQueryRequest, ExecutorService executorService) {
- DataBlock dataBlock;
+ private InstanceResponseBlock processServerQuery(ServerQueryRequest serverQueryRequest,
+ ExecutorService executorService) {
try {
- InstanceResponseBlock instanceResponse = _serverExecutor.execute(serverQueryRequest, executorService);
- if (!instanceResponse.getExceptions().isEmpty()) {
- // if contains exception, directly return a metadata block with the exceptions.
- dataBlock = DataBlockUtils.getErrorDataBlock(instanceResponse.getExceptions());
- } else {
- // this works because default DataTableImplV3 will have a version number at beginning:
- // the new DataBlock encodes lower 16 bits as version and upper 16 bits as type (ROW, COLUMNAR, METADATA)
- dataBlock = DataBlockUtils.getDataBlock(ByteBuffer.wrap(instanceResponse.toDataTable().toBytes()));
- }
+ return _serverExecutor.execute(serverQueryRequest, executorService);
} catch (Exception e) {
- dataBlock = DataBlockUtils.getErrorDataBlock(e);
+ InstanceResponseBlock errorResponse = new InstanceResponseBlock();
+ errorResponse.getExceptions().put(QueryException.QUERY_EXECUTION_ERROR_CODE, e.getMessage());
+ return errorResponse;
}
- return dataBlock;
}
/**
@@ -216,16 +213,13 @@ public class QueryRunner {
private static class LeafStageTransferableBlockOperator extends BaseOperator<TransferableBlock> {
private static final String EXPLAIN_NAME = "LEAF_STAGE_TRANSFER_OPERATOR";
- private final DataBlock _errorBlock;
- private final List<DataBlock> _baseDataBlocks;
- private final DataSchema _dataSchema;
- private boolean _hasTransferred;
+ private final InstanceResponseBlock _errorBlock;
+ private final List<InstanceResponseBlock> _baseResultBlock;
private int _currentIndex;
- private LeafStageTransferableBlockOperator(List<DataBlock> baseDataBlocks, DataSchema dataSchema) {
- _baseDataBlocks = baseDataBlocks;
- _dataSchema = dataSchema;
- _errorBlock = baseDataBlocks.stream().filter(e -> !e.getExceptions().isEmpty()).findFirst().orElse(null);
+ private LeafStageTransferableBlockOperator(List<InstanceResponseBlock> baseResultBlock, DataSchema dataSchema) {
+ _baseResultBlock = baseResultBlock;
+ _errorBlock = baseResultBlock.stream().filter(e -> !e.getExceptions().isEmpty()).findFirst().orElse(null);
_currentIndex = 0;
}
@@ -247,10 +241,18 @@ public class QueryRunner {
}
if (_errorBlock != null) {
_currentIndex = -1;
- return new TransferableBlock(_errorBlock);
+ return new TransferableBlock(DataBlockUtils.getErrorDataBlock(_errorBlock.getExceptions()));
} else {
- if (_currentIndex < _baseDataBlocks.size()) {
- return new TransferableBlock(_baseDataBlocks.get(_currentIndex++));
+ if (_currentIndex < _baseResultBlock.size()) {
+ InstanceResponseBlock responseBlock = _baseResultBlock.get(_currentIndex++);
+ BaseResultsBlock resultsBlock = responseBlock.getResultsBlock();
+ if (resultsBlock != null) {
+ return new TransferableBlock(toList(resultsBlock.getRows(responseBlock.getQueryContext())),
+ responseBlock.getDataSchema(), DataBlock.Type.ROW);
+ } else {
+ return new TransferableBlock(Collections.emptyList(), responseBlock.getDataSchema(),
+ DataBlock.Type.ROW);
+ }
} else {
_currentIndex = -1;
return new TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock());
@@ -259,6 +261,21 @@ public class QueryRunner {
}
}
+ private static List<Object[]> toList(Collection<Object[]> collections) {
+ if (collections instanceof List) {
+ return (List<Object[]>) collections;
+ } else if (collections instanceof PriorityQueue) {
+ PriorityQueue<Object[]> priorityQueue = (PriorityQueue<Object[]>) collections;
+ List<Object[]> sortedList = new ArrayList<>(priorityQueue.size());
+ while (!priorityQueue.isEmpty()) {
+ sortedList.add(priorityQueue.poll());
+ }
+ return sortedList;
+ } else {
+ throw new UnsupportedOperationException("Unsupported collection type: " + collections.getClass());
+ }
+ }
+
private boolean isLeafStage(DistributedStagePlan distributedStagePlan) {
int stageId = distributedStagePlan.getStageId();
ServerInstance serverInstance = distributedStagePlan.getServerInstance();
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
index 959578cbe9..ec89f0d1c9 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.query.runtime.blocks;
+import com.google.common.annotations.VisibleForTesting;
import java.util.List;
import org.apache.pinot.common.datablock.ColumnarDataBlock;
import org.apache.pinot.common.datablock.DataBlock;
@@ -47,6 +48,12 @@ public class TransferableBlock implements Block {
private List<Object[]> _container;
public TransferableBlock(List<Object[]> container, DataSchema dataSchema, DataBlock.Type containerType) {
+ this(container, dataSchema, containerType, false);
+ }
+
+ @VisibleForTesting
+ TransferableBlock(List<Object[]> container, DataSchema dataSchema, DataBlock.Type containerType,
+ boolean isErrorBlock) {
_container = container;
_dataSchema = dataSchema;
_type = containerType;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlockUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlockUtils.java
index 81eb6b2505..66a8982486 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlockUtils.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlockUtils.java
@@ -23,13 +23,14 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import org.apache.pinot.common.datablock.BaseDataBlock;
import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.datablock.DataBlockUtils;
-import org.apache.pinot.common.datablock.RowDataBlock;
+import org.apache.pinot.common.utils.DataSchema;
public final class TransferableBlockUtils {
+ private static final int MEDIAN_COLUMN_SIZE_BYTES = 8;
+
private TransferableBlockUtils() {
// do not instantiate.
}
@@ -59,37 +60,80 @@ public final class TransferableBlockUtils {
}
/**
- * Split a block into multiple block so that each block size is within maxBlockSize.
- * Currently, we only support split for row type dataBlock.
- * For columnar data block, we return the original data block.
- * Metadata data block split is not supported.
+ * Split block into multiple blocks. Default without any clean up.
*
- * When row size is greater than maxBlockSize, we pack each row as a separate block.
+ * @see TransferableBlockUtils#splitBlock(TransferableBlock, DataBlock.Type, int, boolean)
*/
- public static List<TransferableBlock> splitBlock(TransferableBlock block, BaseDataBlock.Type type, int maxBlockSize) {
+ public static List<TransferableBlock> splitBlock(TransferableBlock block, DataBlock.Type type, int maxBlockSize) {
+ return splitBlock(block, type, maxBlockSize, false);
+ }
+
+ /**
+ *
+ * Split a block into multiple block so that each block size is within maxBlockSize. Currently,
+ * <ul>
+ * <li>For row data block, we split for row type dataBlock.</li>
+ * <li>For columnar data block, exceptions are thrown.</li>
+ * <li>For metadata block, split is not supported.</li>
+ * </ul>
+ *
+ * @param block the data block
+ * @param type type of block
+ * @param maxBlockSize Each chunk of data is estimated to be less than maxBlockSize
+ * @param needsCanonicalize whether we need to canonicalize the input rows. set to true if the block is constructed
+ * from leaf stage.
+ * @return a list of data block chunks
+ */
+ public static List<TransferableBlock> splitBlock(TransferableBlock block, DataBlock.Type type, int maxBlockSize,
+ boolean needsCanonicalize) {
List<TransferableBlock> blockChunks = new ArrayList<>();
- if (type != DataBlock.Type.ROW) {
- return Collections.singletonList(block);
- } else {
- int rowSizeInBytes = ((RowDataBlock) block.getDataBlock()).getRowSizeInBytes();
- int numRowsPerChunk = maxBlockSize / rowSizeInBytes;
+ if (type == DataBlock.Type.ROW) {
+ // Use estimated row size, this estimate is not accurate and is used to estimate numRowsPerChunk only.
+ int estimatedRowSizeInBytes = block.getDataSchema().getColumnNames().length * MEDIAN_COLUMN_SIZE_BYTES;
+ int numRowsPerChunk = maxBlockSize / estimatedRowSizeInBytes;
Preconditions.checkState(numRowsPerChunk > 0, "row size too large for query engine to handle, abort!");
int totalNumRows = block.getNumRows();
List<Object[]> allRows = block.getContainer();
+ DataSchema dataSchema = block.getDataSchema();
int currentRow = 0;
while (currentRow < totalNumRows) {
List<Object[]> chunk = allRows.subList(currentRow, Math.min(currentRow + numRowsPerChunk, allRows.size()));
+ if (needsCanonicalize) {
+ canonicalizeRows(chunk, dataSchema);
+ }
currentRow += numRowsPerChunk;
blockChunks.add(new TransferableBlock(chunk, block.getDataSchema(), block.getType()));
}
+ return blockChunks;
+ } else if (type == DataBlock.Type.METADATA) {
+ return Collections.singletonList(block);
+ } else {
+ throw new IllegalArgumentException("Unsupported data block type: " + type);
+ }
+ }
+
+ // In-place canonicalize rows
+ private static void canonicalizeRows(List<Object[]> rows, DataSchema dataSchema) {
+ for (int i = 0; i < rows.size(); i++) {
+ rows.set(i, canonicalizeRow(rows.get(i), dataSchema));
}
- return blockChunks;
}
- public static Object[] getRow(TransferableBlock transferableBlock, int rowId) {
- Preconditions.checkState(transferableBlock.getType() == DataBlock.Type.ROW,
- "TransferableBlockUtils doesn't support get row from non-ROW-based data block type yet!");
- return transferableBlock.getContainer().get(rowId);
+ /**
+ * This util is used to canonicalize row generated from V1 engine, which is stored using
+ * {@link DataSchema#getStoredColumnDataTypes()} format. However, the transferable block ser/de stores data in the
+ * {@link DataSchema#getColumnDataTypes()} format.
+ *
+ * @param row un-canonicalize row.
+ * @param dataSchema data schema desired for the row.
+ * @return canonicalize row.
+ */
+ private static Object[] canonicalizeRow(Object[] row, DataSchema dataSchema) {
+ Object[] resultRow = new Object[row.length];
+ for (int colId = 0; colId < row.length; colId++) {
+ resultRow[colId] = dataSchema.getColumnDataType(colId).convert(row[colId]);
+ }
+ return resultRow;
}
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index a26fa19626..71872ebefd 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -28,7 +28,7 @@ import java.util.Random;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.calcite.rel.RelDistribution;
-import org.apache.pinot.common.datablock.BaseDataBlock;
+import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.operator.BaseOperator;
@@ -67,12 +67,13 @@ public class MailboxSendOperator extends BaseOperator<TransferableBlock> {
private final int _stageId;
private final MailboxService<TransferableBlock> _mailboxService;
private final DataSchema _dataSchema;
+ private final boolean _isLeafStageSender;
private Operator<TransferableBlock> _dataTableBlockBaseOperator;
public MailboxSendOperator(MailboxService<TransferableBlock> mailboxService, DataSchema dataSchema,
Operator<TransferableBlock> dataTableBlockBaseOperator, List<ServerInstance> receivingStageInstances,
RelDistribution.Type exchangeType, KeySelector<Object[], Object[]> keySelector, String hostName, int port,
- long jobId, int stageId) {
+ long jobId, int stageId, boolean isLeafStageSender) {
_dataSchema = dataSchema;
_mailboxService = mailboxService;
_dataTableBlockBaseOperator = dataTableBlockBaseOperator;
@@ -98,6 +99,7 @@ public class MailboxSendOperator extends BaseOperator<TransferableBlock> {
_stageId = stageId;
Preconditions.checkState(SUPPORTED_EXCHANGE_TYPE.contains(_exchangeType),
String.format("Exchange type '%s' is not supported yet", _exchangeType));
+ _isLeafStageSender = isLeafStageSender;
}
@Override
@@ -129,7 +131,7 @@ public class MailboxSendOperator extends BaseOperator<TransferableBlock> {
}
boolean isEndOfStream = TransferableBlockUtils.isEndOfStream(transferableBlock);
- BaseDataBlock.Type type = transferableBlock.getType();
+ DataBlock.Type type = transferableBlock.getType();
try {
switch (_exchangeType) {
case SINGLETON:
@@ -182,8 +184,7 @@ public class MailboxSendOperator extends BaseOperator<TransferableBlock> {
for (int i = 0; i < partitionSize; i++) {
temporaryRows.add(new ArrayList<>());
}
- for (int rowId = 0; rowId < transferableBlock.getNumRows(); rowId++) {
- Object[] row = TransferableBlockUtils.getRow(transferableBlock, rowId);
+ for (Object[] row : transferableBlock.getContainer()) {
int partitionId = keySelector.computeHash(row) % partitionSize;
temporaryRows.get(partitionId).add(row);
}
@@ -197,7 +198,7 @@ public class MailboxSendOperator extends BaseOperator<TransferableBlock> {
}
private void sendDataTableBlockToServers(List<ServerInstance> servers, TransferableBlock transferableBlock,
- BaseDataBlock.Type type, boolean isEndOfStream) {
+ DataBlock.Type type, boolean isEndOfStream) {
if (isEndOfStream) {
for (ServerInstance server : servers) {
sendDataTableBlock(server, transferableBlock, true);
@@ -205,7 +206,7 @@ public class MailboxSendOperator extends BaseOperator<TransferableBlock> {
} else {
// Split the block only when it is not end of stream block.
List<TransferableBlock> chunks = TransferableBlockUtils.splitBlock(transferableBlock, type,
- MAX_MAILBOX_CONTENT_SIZE_BYTES);
+ MAX_MAILBOX_CONTENT_SIZE_BYTES, _isLeafStageSender);
for (ServerInstance server : servers) {
for (TransferableBlock chunk : chunks) {
sendDataTableBlock(server, chunk, false);
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
index 6ba6d6b4c1..57000e1267 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
@@ -76,7 +76,7 @@ public class PhysicalPlanVisitor implements StageNodeVisitor<Operator<Transferab
StageMetadata receivingStageMetadata = context.getMetadataMap().get(node.getReceiverStageId());
return new MailboxSendOperator(context.getMailboxService(), node.getDataSchema(), nextOperator,
receivingStageMetadata.getServerInstances(), node.getExchangeType(), node.getPartitionKeySelector(),
- context.getHostName(), context.getPort(), context.getRequestId(), node.getStageId());
+ context.getHostName(), context.getPort(), context.getRequestId(), node.getStageId(), false);
}
@Override
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/TransferableBlockUtilsTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/TransferableBlockUtilsTest.java
index 1d30274588..9d64592e94 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/TransferableBlockUtilsTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/TransferableBlockUtilsTest.java
@@ -38,6 +38,7 @@ import org.testng.annotations.Test;
public class TransferableBlockUtilsTest {
private static final int TOTAL_ROW_COUNT = 50;
+ private static final int TEST_EST_BYTES_PER_COLUMN = 8;
private static final List<DataSchema.ColumnDataType> EXCLUDE_DATA_TYPES = ImmutableList.of(
DataSchema.ColumnDataType.OBJECT, DataSchema.ColumnDataType.JSON, DataSchema.ColumnDataType.BYTES,
DataSchema.ColumnDataType.BYTES_ARRAY);
@@ -67,15 +68,15 @@ public class TransferableBlockUtilsTest {
public void testSplitBlockUtils(int splitRowCount)
throws Exception {
DataSchema dataSchema = getDataSchema();
- List<Object[]> rows = DataBlockTestUtils.getRandomRows(dataSchema, TOTAL_ROW_COUNT, 1);
// compare serialized split
+ int estRowSizeInBytes = dataSchema.size() * TEST_EST_BYTES_PER_COLUMN;
+ List<Object[]> rows = DataBlockTestUtils.getRandomRows(dataSchema, TOTAL_ROW_COUNT, 1);
RowDataBlock rowBlock = DataBlockBuilder.buildFromRows(rows, dataSchema);
- int rowSizeInBytes = rowBlock.getRowSizeInBytes();
validateBlocks(TransferableBlockUtils.splitBlock(new TransferableBlock(rowBlock),
- DataBlock.Type.ROW, rowSizeInBytes * splitRowCount + 1), rows, dataSchema);
+ DataBlock.Type.ROW, estRowSizeInBytes * splitRowCount + 1), rows, dataSchema);
// compare non-serialized split
validateBlocks(TransferableBlockUtils.splitBlock(new TransferableBlock(rows, dataSchema, DataBlock.Type.ROW),
- DataBlock.Type.ROW, rowSizeInBytes * splitRowCount + 1), rows, dataSchema);
+ DataBlock.Type.ROW, estRowSizeInBytes * splitRowCount + 1), rows, dataSchema);
}
@Test
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockDataBlockOperatorFactory.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockDataBlockOperatorFactory.java
index a85ae59e6d..0140576981 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockDataBlockOperatorFactory.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockDataBlockOperatorFactory.java
@@ -22,7 +22,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.pinot.common.datablock.BaseDataBlock;
+import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.operator.BaseOperator;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
@@ -62,7 +62,7 @@ public class MockDataBlockOperatorFactory {
return _invocationCount >= _rowsMap.get(operatorName).size()
? TransferableBlockUtils.getEndOfStreamTransferableBlock()
: new TransferableBlock(_rowsMap.get(operatorName).get(_invocationCount++),
- _operatorSchemaMap.get(operatorName), BaseDataBlock.Type.ROW);
+ _operatorSchemaMap.get(operatorName), DataBlock.Type.ROW);
}
});
return operator;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org