You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2022/06/08 06:03:07 UTC
[pinot] 08/11: row/columnar compatible block (#8583)
This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
commit 30e21eb394a9d022a40bca26b7dcf5e4641f688f
Author: Rong Rong <wa...@gmail.com>
AuthorDate: Tue May 10 14:30:46 2022 -0700
row/columnar compatible block (#8583)
adding tests and verify both row/columnar format
adding test and validating all types
fix and add query dispatcher test
Co-authored-by: Rong Rong <ro...@startree.ai>
---
.../apache/pinot/query/runtime/QueryRunner.java | 15 +-
.../pinot/query/runtime/blocks/BaseDataBlock.java | 697 +++++++++++++++++++++
.../query/runtime/blocks/ColumnarDataBlock.java | 96 +++
.../query/runtime/blocks/DataBlockBuilder.java | 406 ++++++++++++
.../pinot/query/runtime/blocks/DataBlockUtils.java | 174 +++++
.../query/runtime/blocks/DataTableBlockUtils.java | 71 ---
.../pinot/query/runtime/blocks/MetadataBlock.java | 66 ++
.../pinot/query/runtime/blocks/RowDataBlock.java | 90 +++
...{DataTableBlock.java => TransferableBlock.java} | 37 +-
.../runtime/executor/WorkerQueryExecutor.java | 16 +-
.../query/runtime/operator/HashJoinOperator.java | 51 +-
.../runtime/operator/MailboxReceiveOperator.java | 19 +-
.../runtime/operator/MailboxSendOperator.java | 48 +-
.../pinot/query/service/QueryDispatcher.java | 54 +-
.../query/mailbox/GrpcMailboxServiceTest.java | 7 +-
.../pinot/query/runtime/QueryRunnerTest.java | 52 +-
.../pinot/query/runtime/blocks/DataBlockTest.java | 80 +++
.../query/runtime/blocks/DataBlockTestUtils.java | 181 ++++++
.../pinot/query/service/QueryDispatcherTest.java | 92 +++
19 files changed, 2034 insertions(+), 218 deletions(-)
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index f33cde43c2..e25c6e03b8 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -18,6 +18,8 @@
*/
package org.apache.pinot.query.runtime;
+import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
@@ -32,6 +34,8 @@ import org.apache.pinot.query.mailbox.GrpcMailboxService;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.planner.StageMetadata;
import org.apache.pinot.query.planner.stage.MailboxSendNode;
+import org.apache.pinot.query.runtime.blocks.BaseDataBlock;
+import org.apache.pinot.query.runtime.blocks.DataBlockUtils;
import org.apache.pinot.query.runtime.executor.WorkerQueryExecutor;
import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
@@ -94,12 +98,19 @@ public class QueryRunner {
ServerRequestUtils.constructServerQueryRequest(distributedStagePlan, requestMetadataMap);
// send the data table via mailbox in one-off fashion (e.g. no block-level split, one data table/partition key)
- DataTable dataTable = _serverExecutor.processQuery(serverQueryRequest, executorService, null);
+ BaseDataBlock dataBlock;
+ try {
+ DataTable dataTable = _serverExecutor.processQuery(serverQueryRequest, executorService, null);
+ // this works because default DataTableImplV3 will have ordinal 0, which maps to ROW(0)
+ dataBlock = DataBlockUtils.getDataBlock(ByteBuffer.wrap(dataTable.toBytes()));
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to convert byte buffer", e);
+ }
MailboxSendNode sendNode = (MailboxSendNode) distributedStagePlan.getStageRoot();
StageMetadata receivingStageMetadata = distributedStagePlan.getMetadataMap().get(sendNode.getReceiverStageId());
MailboxSendOperator mailboxSendOperator =
- new MailboxSendOperator(_mailboxService, dataTable, receivingStageMetadata.getServerInstances(),
+ new MailboxSendOperator(_mailboxService, dataBlock, receivingStageMetadata.getServerInstances(),
sendNode.getExchangeType(), sendNode.getPartitionKeySelector(), _hostname, _port,
serverQueryRequest.getRequestId(), sendNode.getStageId());
mailboxSendOperator.nextBlock();
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/BaseDataBlock.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/BaseDataBlock.java
new file mode 100644
index 0000000000..97d007b58b
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/BaseDataBlock.java
@@ -0,0 +1,697 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.blocks;
+
+import com.google.common.primitives.Ints;
+import com.google.common.primitives.Longs;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.common.response.ProcessingException;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.common.datatable.DataTableUtils;
+import org.apache.pinot.core.query.request.context.ThreadTimer;
+import org.apache.pinot.spi.utils.BigDecimalUtils;
+import org.apache.pinot.spi.utils.ByteArray;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+
+/**
+ * Base data block mostly replicating implementation of {@link org.apache.pinot.core.common.datatable.DataTableImplV3}.
+ *
+ * +-----------------------------------------------+
+ * | 13 integers of header: |
+ * | VERSION |
+ * | NUM_ROWS |
+ * | NUM_COLUMNS |
+ * | EXCEPTIONS SECTION START OFFSET |
+ * | EXCEPTIONS SECTION LENGTH |
+ * | DICTIONARY_MAP SECTION START OFFSET |
+ * | DICTIONARY_MAP SECTION LENGTH |
+ * | DATA_SCHEMA SECTION START OFFSET |
+ * | DATA_SCHEMA SECTION LENGTH |
+ * | FIXED_SIZE_DATA SECTION START OFFSET |
+ * | FIXED_SIZE_DATA SECTION LENGTH |
+ * | VARIABLE_SIZE_DATA SECTION START OFFSET |
+ * | VARIABLE_SIZE_DATA SECTION LENGTH |
+ * +-----------------------------------------------+
+ * | EXCEPTIONS SECTION |
+ * +-----------------------------------------------+
+ * | DICTIONARY_MAP SECTION |
+ * +-----------------------------------------------+
+ * | DATA_SCHEMA SECTION |
+ * +-----------------------------------------------+
+ * | FIXED_SIZE_DATA SECTION |
+ * +-----------------------------------------------+
+ * | VARIABLE_SIZE_DATA SECTION |
+ * +-----------------------------------------------+
+ * | METADATA LENGTH |
+ * | METADATA SECTION |
+ * +-----------------------------------------------+
+ *
+ * To support both row and columnar data format. the size of the data payload will be exactly the same. the only
+ * difference is the data layout in FIXED_SIZE_DATA and VARIABLE_SIZE_DATA section, see each impl for details.
+ */
+@SuppressWarnings("DuplicatedCode")
+public abstract class BaseDataBlock implements DataTable {
+ protected static final int HEADER_SIZE = Integer.BYTES * 13;
+ // _errCodeToExceptionMap stores exceptions as a map of errorCode->errorMessage
+ protected Map<Integer, String> _errCodeToExceptionMap;
+
+ protected int _numRows;
+ protected int _numColumns;
+ protected DataSchema _dataSchema;
+ protected Map<String, Map<Integer, String>> _dictionaryMap;
+ protected byte[] _fixedSizeDataBytes;
+ protected ByteBuffer _fixedSizeData;
+ protected byte[] _variableSizeDataBytes;
+ protected ByteBuffer _variableSizeData;
+ protected Map<String, String> _metadata;
+
+ /**
+ * construct a base data block.
+ * @param numRows num of rows in the block
+ * @param dataSchema schema of the data in the block
+ * @param dictionaryMap dictionary encoding map
+ * @param fixedSizeDataBytes byte[] for fix-sized columns.
+ * @param variableSizeDataBytes byte[] for variable length columns (arrays).
+ */
+ public BaseDataBlock(int numRows, DataSchema dataSchema, Map<String, Map<Integer, String>> dictionaryMap,
+ byte[] fixedSizeDataBytes, byte[] variableSizeDataBytes) {
+ _numRows = numRows;
+ _numColumns = dataSchema.size();
+ _dataSchema = dataSchema;
+ _dictionaryMap = dictionaryMap;
+ _fixedSizeDataBytes = fixedSizeDataBytes;
+ _fixedSizeData = ByteBuffer.wrap(fixedSizeDataBytes);
+ _variableSizeDataBytes = variableSizeDataBytes;
+ _variableSizeData = ByteBuffer.wrap(variableSizeDataBytes);
+ _metadata = new HashMap<>();
+ _errCodeToExceptionMap = new HashMap<>();
+ }
+
+ /**
+ * Construct empty data table.
+ */
+ public BaseDataBlock() {
+ _numRows = 0;
+ _numColumns = 0;
+ _dataSchema = null;
+ _dictionaryMap = null;
+ _fixedSizeDataBytes = null;
+ _fixedSizeData = null;
+ _variableSizeDataBytes = null;
+ _variableSizeData = null;
+ _metadata = new HashMap<>();
+ _errCodeToExceptionMap = new HashMap<>();
+ }
+
+ public BaseDataBlock(ByteBuffer byteBuffer)
+ throws IOException {
+ // Read header.
+ _numRows = byteBuffer.getInt();
+ _numColumns = byteBuffer.getInt();
+ int exceptionsStart = byteBuffer.getInt();
+ int exceptionsLength = byteBuffer.getInt();
+ int dictionaryMapStart = byteBuffer.getInt();
+ int dictionaryMapLength = byteBuffer.getInt();
+ int dataSchemaStart = byteBuffer.getInt();
+ int dataSchemaLength = byteBuffer.getInt();
+ int fixedSizeDataStart = byteBuffer.getInt();
+ int fixedSizeDataLength = byteBuffer.getInt();
+ int variableSizeDataStart = byteBuffer.getInt();
+ int variableSizeDataLength = byteBuffer.getInt();
+
+
+ // Read exceptions.
+ if (exceptionsLength != 0) {
+ byteBuffer.position(exceptionsStart);
+ _errCodeToExceptionMap = deserializeExceptions(byteBuffer);
+ } else {
+ _errCodeToExceptionMap = new HashMap<>();
+ }
+
+ // Read dictionary.
+ if (dictionaryMapLength != 0) {
+ byteBuffer.position(dictionaryMapStart);
+ _dictionaryMap = deserializeDictionaryMap(byteBuffer);
+ } else {
+ _dictionaryMap = null;
+ }
+
+ // Read data schema.
+ if (dataSchemaLength != 0) {
+ byteBuffer.position(dataSchemaStart);
+ _dataSchema = DataSchema.fromBytes(byteBuffer);
+ } else {
+ _dataSchema = null;
+ }
+
+ // Read fixed size data.
+ if (fixedSizeDataLength != 0) {
+ _fixedSizeDataBytes = new byte[fixedSizeDataLength];
+ byteBuffer.position(fixedSizeDataStart);
+ byteBuffer.get(_fixedSizeDataBytes);
+ _fixedSizeData = ByteBuffer.wrap(_fixedSizeDataBytes);
+ } else {
+ _fixedSizeDataBytes = null;
+ _fixedSizeData = null;
+ }
+
+ // Read variable size data.
+ if (variableSizeDataLength != 0) {
+ _variableSizeDataBytes = new byte[variableSizeDataLength];
+ byteBuffer.position(variableSizeDataStart);
+ byteBuffer.get(_variableSizeDataBytes);
+ _variableSizeData = ByteBuffer.wrap(_variableSizeDataBytes);
+ } else {
+ _variableSizeDataBytes = null;
+ _variableSizeData = null;
+ }
+
+ // Read metadata.
+ int metadataLength = byteBuffer.getInt();
+ if (metadataLength != 0) {
+ _metadata = deserializeMetadata(byteBuffer);
+ }
+ }
+
+ /**
+ * Return the int serialized form of the data block version and type.
+ * @return
+ */
+ protected abstract int getDataBlockVersionType();
+
+ /**
+ * position the {@code _fixedSizeDataBytes} member variable to the corresponding row/column ID.
+ * @param rowId row ID
+ * @param colId column ID
+ */
+ protected abstract void positionCursorInFixSizedBuffer(int rowId, int colId);
+
+ /**
+ * position the {@code _variableSizeDataBytes} member variable to the corresponding row/column ID. and return the
+ * length of bytes to extract from the variable size buffer.
+ *
+ * @param rowId row ID
+ * @param colId column ID
+ * @return the length to extract from variable size buffer.
+ */
+ protected abstract int positionCursorInVariableBuffer(int rowId, int colId);
+
+ @Override
+ public Map<String, String> getMetadata() {
+ return _metadata;
+ }
+
+ @Override
+ public DataSchema getDataSchema() {
+ return _dataSchema;
+ }
+
+ @Override
+ public int getNumberOfRows() {
+ return _numRows;
+ }
+
+ // --------------------------------------------------------------------------
+ // Fixed sized element access.
+ // --------------------------------------------------------------------------
+
+ @Override
+ public int getInt(int rowId, int colId) {
+ positionCursorInFixSizedBuffer(rowId, colId);
+ return _fixedSizeData.getInt();
+ }
+
+ @Override
+ public long getLong(int rowId, int colId) {
+ positionCursorInFixSizedBuffer(rowId, colId);
+ return _fixedSizeData.getLong();
+ }
+
+ @Override
+ public float getFloat(int rowId, int colId) {
+ positionCursorInFixSizedBuffer(rowId, colId);
+ return _fixedSizeData.getFloat();
+ }
+
+ @Override
+ public double getDouble(int rowId, int colId) {
+ positionCursorInFixSizedBuffer(rowId, colId);
+ return _fixedSizeData.getDouble();
+ }
+
+ @Override
+ public BigDecimal getBigDecimal(int rowId, int colId) {
+ int size = positionCursorInVariableBuffer(rowId, colId);
+ ByteBuffer byteBuffer = _variableSizeData.slice();
+ byteBuffer.limit(size);
+ return BigDecimalUtils.deserialize(byteBuffer);
+ }
+
+ @Override
+ public String getString(int rowId, int colId) {
+ positionCursorInFixSizedBuffer(rowId, colId);
+ int dictId = _fixedSizeData.getInt();
+ return _dictionaryMap.get(_dataSchema.getColumnName(colId)).get(dictId);
+ }
+
+ @Override
+ public ByteArray getBytes(int rowId, int colId) {
+ int size = positionCursorInVariableBuffer(rowId, colId);
+ byte[] buffer = new byte[size];
+ _variableSizeData.get(buffer);
+ return new ByteArray(buffer);
+ }
+
+ // --------------------------------------------------------------------------
+ // Variable sized element access.
+ // --------------------------------------------------------------------------
+
+ @Override
+ public <T> T getObject(int rowId, int colId) {
+ int size = positionCursorInVariableBuffer(rowId, colId);
+ int objectTypeValue = _variableSizeData.getInt();
+ ByteBuffer byteBuffer = _variableSizeData.slice();
+ byteBuffer.limit(size);
+ return ObjectSerDeUtils.deserialize(byteBuffer, objectTypeValue);
+ }
+
+ @Override
+ public int[] getIntArray(int rowId, int colId) {
+ int length = positionCursorInVariableBuffer(rowId, colId);
+ int[] ints = new int[length];
+ for (int i = 0; i < length; i++) {
+ ints[i] = _variableSizeData.getInt();
+ }
+ return ints;
+ }
+
+ @Override
+ public long[] getLongArray(int rowId, int colId) {
+ int length = positionCursorInVariableBuffer(rowId, colId);
+ long[] longs = new long[length];
+ for (int i = 0; i < length; i++) {
+ longs[i] = _variableSizeData.getLong();
+ }
+ return longs;
+ }
+
+ @Override
+ public float[] getFloatArray(int rowId, int colId) {
+ int length = positionCursorInVariableBuffer(rowId, colId);
+ float[] floats = new float[length];
+ for (int i = 0; i < length; i++) {
+ floats[i] = _variableSizeData.getFloat();
+ }
+ return floats;
+ }
+
+ @Override
+ public double[] getDoubleArray(int rowId, int colId) {
+ int length = positionCursorInVariableBuffer(rowId, colId);
+ double[] doubles = new double[length];
+ for (int i = 0; i < length; i++) {
+ doubles[i] = _variableSizeData.getDouble();
+ }
+ return doubles;
+ }
+
+ @Override
+ public String[] getStringArray(int rowId, int colId) {
+ int length = positionCursorInVariableBuffer(rowId, colId);
+ String[] strings = new String[length];
+ Map<Integer, String> dictionary = _dictionaryMap.get(_dataSchema.getColumnName(colId));
+ for (int i = 0; i < length; i++) {
+ strings[i] = dictionary.get(_variableSizeData.getInt());
+ }
+ return strings;
+ }
+
+ // --------------------------------------------------------------------------
+ // Ser/De and exception handling
+ // --------------------------------------------------------------------------
+
+ /**
+ * Helper method to serialize dictionary map.
+ */
+ protected byte[] serializeDictionaryMap()
+ throws IOException {
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
+
+ dataOutputStream.writeInt(_dictionaryMap.size());
+ for (Map.Entry<String, Map<Integer, String>> dictionaryMapEntry : _dictionaryMap.entrySet()) {
+ String columnName = dictionaryMapEntry.getKey();
+ Map<Integer, String> dictionary = dictionaryMapEntry.getValue();
+ byte[] bytes = columnName.getBytes(UTF_8);
+ dataOutputStream.writeInt(bytes.length);
+ dataOutputStream.write(bytes);
+ dataOutputStream.writeInt(dictionary.size());
+
+ for (Map.Entry<Integer, String> dictionaryEntry : dictionary.entrySet()) {
+ dataOutputStream.writeInt(dictionaryEntry.getKey());
+ byte[] valueBytes = dictionaryEntry.getValue().getBytes(UTF_8);
+ dataOutputStream.writeInt(valueBytes.length);
+ dataOutputStream.write(valueBytes);
+ }
+ }
+
+ return byteArrayOutputStream.toByteArray();
+ }
+
+ /**
+ * Helper method to deserialize dictionary map.
+ */
+ protected Map<String, Map<Integer, String>> deserializeDictionaryMap(ByteBuffer buffer)
+ throws IOException {
+ int numDictionaries = buffer.getInt();
+ Map<String, Map<Integer, String>> dictionaryMap = new HashMap<>(numDictionaries);
+
+ for (int i = 0; i < numDictionaries; i++) {
+ String column = DataTableUtils.decodeString(buffer);
+ int dictionarySize = buffer.getInt();
+ Map<Integer, String> dictionary = new HashMap<>(dictionarySize);
+ for (int j = 0; j < dictionarySize; j++) {
+ int key = buffer.getInt();
+ String value = DataTableUtils.decodeString(buffer);
+ dictionary.put(key, value);
+ }
+ dictionaryMap.put(column, dictionary);
+ }
+
+ return dictionaryMap;
+ }
+
+ @Override
+ public void addException(ProcessingException processingException) {
+ _errCodeToExceptionMap.put(processingException.getErrorCode(), processingException.getMessage());
+ }
+
+ @Override
+ public void addException(int errCode, String errMsg) {
+ _errCodeToExceptionMap.put(errCode, errMsg);
+ }
+
+ @Override
+ public Map<Integer, String> getExceptions() {
+ return _errCodeToExceptionMap;
+ }
+
+ @Override
+ public byte[] toBytes()
+ throws IOException {
+ ThreadTimer threadTimer = new ThreadTimer();
+
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
+ writeLeadingSections(dataOutputStream);
+
+ // Add table serialization time metadata if thread timer is enabled.
+ if (ThreadTimer.isThreadCpuTimeMeasurementEnabled()) {
+ long responseSerializationCpuTimeNs = threadTimer.getThreadTimeNs();
+ getMetadata().put(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName(), String.valueOf(responseSerializationCpuTimeNs));
+ }
+
+ // Write metadata: length followed by actual metadata bytes.
+ // NOTE: We ignore metadata serialization time in "responseSerializationCpuTimeNs" as it's negligible while
+ // considering it will bring a lot code complexity.
+ byte[] metadataBytes = serializeMetadata();
+ dataOutputStream.writeInt(metadataBytes.length);
+ dataOutputStream.write(metadataBytes);
+
+ return byteArrayOutputStream.toByteArray();
+ }
+
+ private void writeLeadingSections(DataOutputStream dataOutputStream)
+ throws IOException {
+ dataOutputStream.writeInt(getDataBlockVersionType());
+ dataOutputStream.writeInt(_numRows);
+ dataOutputStream.writeInt(_numColumns);
+ int dataOffset = HEADER_SIZE;
+
+ // Write exceptions section offset(START|SIZE).
+ dataOutputStream.writeInt(dataOffset);
+ byte[] exceptionsBytes;
+ exceptionsBytes = serializeExceptions();
+ dataOutputStream.writeInt(exceptionsBytes.length);
+ dataOffset += exceptionsBytes.length;
+
+ // Write dictionary map section offset(START|SIZE).
+ dataOutputStream.writeInt(dataOffset);
+ byte[] dictionaryMapBytes = null;
+ if (_dictionaryMap != null) {
+ dictionaryMapBytes = serializeDictionaryMap();
+ dataOutputStream.writeInt(dictionaryMapBytes.length);
+ dataOffset += dictionaryMapBytes.length;
+ } else {
+ dataOutputStream.writeInt(0);
+ }
+
+ // Write data schema section offset(START|SIZE).
+ dataOutputStream.writeInt(dataOffset);
+ byte[] dataSchemaBytes = null;
+ if (_dataSchema != null) {
+ dataSchemaBytes = _dataSchema.toBytes();
+ dataOutputStream.writeInt(dataSchemaBytes.length);
+ dataOffset += dataSchemaBytes.length;
+ } else {
+ dataOutputStream.writeInt(0);
+ }
+
+ // Write fixed size data section offset(START|SIZE).
+ dataOutputStream.writeInt(dataOffset);
+ if (_fixedSizeDataBytes != null) {
+ dataOutputStream.writeInt(_fixedSizeDataBytes.length);
+ dataOffset += _fixedSizeDataBytes.length;
+ } else {
+ dataOutputStream.writeInt(0);
+ }
+
+ // Write variable size data section offset(START|SIZE).
+ dataOutputStream.writeInt(dataOffset);
+ if (_variableSizeDataBytes != null) {
+ dataOutputStream.writeInt(_variableSizeDataBytes.length);
+ } else {
+ dataOutputStream.writeInt(0);
+ }
+
+ // Write actual data.
+ // Write exceptions bytes.
+ dataOutputStream.write(exceptionsBytes);
+ // Write dictionary map bytes.
+ if (dictionaryMapBytes != null) {
+ dataOutputStream.write(dictionaryMapBytes);
+ }
+ // Write data schema bytes.
+ if (dataSchemaBytes != null) {
+ dataOutputStream.write(dataSchemaBytes);
+ }
+ // Write fixed size data bytes.
+ if (_fixedSizeDataBytes != null) {
+ dataOutputStream.write(_fixedSizeDataBytes);
+ }
+ // Write variable size data bytes.
+ if (_variableSizeDataBytes != null) {
+ dataOutputStream.write(_variableSizeDataBytes);
+ }
+ }
+
+ /**
+ * Serialize metadata section to bytes.
+ * Format of the bytes looks like:
+ * [numEntries, bytesOfKV2, bytesOfKV2, bytesOfKV3]
+ * For each KV pair:
+ * - if the value type is String, encode it as: [enumKeyOrdinal, valueLength, Utf8EncodedValue].
+ * - if the value type is int, encode it as: [enumKeyOrdinal, bigEndianRepresentationOfIntValue]
+ * - if the value type is long, encode it as: [enumKeyOrdinal, bigEndianRepresentationOfLongValue]
+ *
+ * Unlike V2, where numeric metadata values (int and long) in V3 are encoded in UTF-8 in the wire format,
+ * in V3 big endian representation is used.
+ */
+ private byte[] serializeMetadata()
+ throws IOException {
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
+
+ dataOutputStream.writeInt(_metadata.size());
+
+ for (Map.Entry<String, String> entry : _metadata.entrySet()) {
+ MetadataKey key = MetadataKey.getByName(entry.getKey());
+ // Ignore unknown keys.
+ if (key == null) {
+ continue;
+ }
+ String value = entry.getValue();
+ dataOutputStream.writeInt(key.ordinal());
+ if (key.getValueType() == MetadataValueType.INT) {
+ dataOutputStream.write(Ints.toByteArray(Integer.parseInt(value)));
+ } else if (key.getValueType() == MetadataValueType.LONG) {
+ dataOutputStream.write(Longs.toByteArray(Long.parseLong(value)));
+ } else {
+ byte[] valueBytes = value.getBytes(UTF_8);
+ dataOutputStream.writeInt(valueBytes.length);
+ dataOutputStream.write(valueBytes);
+ }
+ }
+
+ return byteArrayOutputStream.toByteArray();
+ }
+
+ /**
+ * Even though the wire format of V3 uses UTF-8 for string/bytes and big-endian for numeric values,
+ * the in-memory representation is STRING based for processing the metadata before serialization
+ * (by the server as it adds the statistics in metadata) and after deserialization (by the broker as it receives
+ * DataTable from each server and aggregates the values).
+ * This is to make V3 implementation keep the consumers of Map<String, String> getMetadata() API in the code happy
+ * by internally converting it.
+ *
+ * This method use relative operations on the ByteBuffer and expects the buffer's position to be set correctly.
+ */
+ private Map<String, String> deserializeMetadata(ByteBuffer buffer)
+ throws IOException {
+ int numEntries = buffer.getInt();
+ Map<String, String> metadata = new HashMap<>();
+ for (int i = 0; i < numEntries; i++) {
+ int keyId = buffer.getInt();
+ MetadataKey key = MetadataKey.getByOrdinal(keyId);
+ // Ignore unknown keys.
+ if (key == null) {
+ continue;
+ }
+ if (key.getValueType() == MetadataValueType.INT) {
+ String value = "" + buffer.getInt();
+ metadata.put(key.getName(), value);
+ } else if (key.getValueType() == MetadataValueType.LONG) {
+ String value = "" + buffer.getLong();
+ metadata.put(key.getName(), value);
+ } else {
+ String value = DataTableUtils.decodeString(buffer);
+ metadata.put(key.getName(), value);
+ }
+ }
+ return metadata;
+ }
+
+ private byte[] serializeExceptions()
+ throws IOException {
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
+
+ dataOutputStream.writeInt(_errCodeToExceptionMap.size());
+
+ for (Map.Entry<Integer, String> entry : _errCodeToExceptionMap.entrySet()) {
+ int key = entry.getKey();
+ String value = entry.getValue();
+ byte[] valueBytes = value.getBytes(UTF_8);
+ dataOutputStream.writeInt(key);
+ dataOutputStream.writeInt(valueBytes.length);
+ dataOutputStream.write(valueBytes);
+ }
+
+ return byteArrayOutputStream.toByteArray();
+ }
+
+ private Map<Integer, String> deserializeExceptions(ByteBuffer buffer)
+ throws IOException {
+ int numExceptions = buffer.getInt();
+ Map<Integer, String> exceptions = new HashMap<>(numExceptions);
+ for (int i = 0; i < numExceptions; i++) {
+ int errCode = buffer.getInt();
+ String errMessage = DataTableUtils.decodeString(buffer);
+ exceptions.put(errCode, errMessage);
+ }
+ return exceptions;
+ }
+
+ @Override
+ public String toString() {
+ if (_dataSchema == null) {
+ return _metadata.toString();
+ }
+
+ StringBuilder stringBuilder = new StringBuilder();
+ stringBuilder.append(_dataSchema.toString()).append('\n');
+ stringBuilder.append("numRows: ").append(_numRows).append('\n');
+
+ DataSchema.ColumnDataType[] storedColumnDataTypes = _dataSchema.getStoredColumnDataTypes();
+ _fixedSizeData.position(0);
+ for (int rowId = 0; rowId < _numRows; rowId++) {
+ for (int colId = 0; colId < _numColumns; colId++) {
+ switch (storedColumnDataTypes[colId]) {
+ case INT:
+ stringBuilder.append(_fixedSizeData.getInt());
+ break;
+ case LONG:
+ stringBuilder.append(_fixedSizeData.getLong());
+ break;
+ case FLOAT:
+ stringBuilder.append(_fixedSizeData.getFloat());
+ break;
+ case DOUBLE:
+ stringBuilder.append(_fixedSizeData.getDouble());
+ break;
+ case STRING:
+ stringBuilder.append(_fixedSizeData.getInt());
+ break;
+ // Object and array.
+ default:
+ stringBuilder.append(String.format("(%s:%s)", _fixedSizeData.getInt(), _fixedSizeData.getInt()));
+ break;
+ }
+ stringBuilder.append("\t");
+ }
+ stringBuilder.append("\n");
+ }
+ return stringBuilder.toString();
+ }
+
+ public enum Type {
+ ROW(0),
+ COLUMNAR(1),
+ METADATA(2);
+
+ private final int _ordinal;
+
+ Type(int ordinal) {
+ _ordinal = ordinal;
+ }
+
+ public static Type fromOrdinal(int ordinal) {
+ switch (ordinal) {
+ case 0:
+ return ROW;
+ case 1:
+ return COLUMNAR;
+ case 2:
+ return METADATA;
+ default:
+ throw new IllegalArgumentException("Invalid ordinal: " + ordinal);
+ }
+ }
+ }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/ColumnarDataBlock.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/ColumnarDataBlock.java
new file mode 100644
index 0000000000..8510043251
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/ColumnarDataBlock.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.blocks;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import org.apache.pinot.common.utils.DataSchema;
+
+
+/**
+ * Column-wise data table. It stores data in columnar-major format.
+ */
+public class ColumnarDataBlock extends BaseDataBlock {
+ private static final int VERSION = 1;
+ protected int[] _cumulativeColumnOffsetSizeInBytes;
+ protected int[] _columnSizeInBytes;
+
+ public ColumnarDataBlock() {
+ super();
+ }
+
+ public ColumnarDataBlock(int numRows, DataSchema dataSchema, Map<String, Map<Integer, String>> dictionaryMap,
+ byte[] fixedSizeDataBytes, byte[] variableSizeDataBytes) {
+ super(numRows, dataSchema, dictionaryMap, fixedSizeDataBytes, variableSizeDataBytes);
+ computeBlockObjectConstants();
+ }
+
+ public ColumnarDataBlock(ByteBuffer byteBuffer)
+ throws IOException {
+ super(byteBuffer);
+ computeBlockObjectConstants();
+ }
+
+ protected void computeBlockObjectConstants() {
+ if (_dataSchema != null) {
+ _cumulativeColumnOffsetSizeInBytes = new int[_numColumns];
+ _columnSizeInBytes = new int[_numColumns];
+ DataBlockUtils.computeColumnSizeInBytes(_dataSchema, _columnSizeInBytes);
+ int cumulativeColumnOffset = 0;
+ for (int i = 0; i < _numColumns; i++) {
+ _cumulativeColumnOffsetSizeInBytes[i] = cumulativeColumnOffset;
+ cumulativeColumnOffset += _columnSizeInBytes[i] * _numRows;
+ }
+ }
+ }
+
+ @Override
+ protected int getDataBlockVersionType() {
+ return VERSION + (Type.COLUMNAR.ordinal() << DataBlockUtils.VERSION_TYPE_SHIFT);
+ }
+
+ @Override
+ protected void positionCursorInFixSizedBuffer(int rowId, int colId) {
+ int position = _cumulativeColumnOffsetSizeInBytes[colId] + _columnSizeInBytes[colId] * rowId;
+ _fixedSizeData.position(position);
+ }
+
+ @Override
+ protected int positionCursorInVariableBuffer(int rowId, int colId) {
+ positionCursorInFixSizedBuffer(rowId, colId);
+ _variableSizeData.position(_fixedSizeData.getInt());
+ return _fixedSizeData.getInt();
+ }
+
+ @Override
+ public ColumnarDataBlock toMetadataOnlyDataTable() {
+ ColumnarDataBlock metadataOnlyDataTable = new ColumnarDataBlock();
+ metadataOnlyDataTable._metadata.putAll(_metadata);
+ metadataOnlyDataTable._errCodeToExceptionMap.putAll(_errCodeToExceptionMap);
+ return metadataOnlyDataTable;
+ }
+
+ @Override
+ public ColumnarDataBlock toDataOnlyDataTable() {
+ return new ColumnarDataBlock(_numRows, _dataSchema, _dictionaryMap, _fixedSizeDataBytes, _variableSizeDataBytes);
+ }
+
+ // TODO: add whole-column access methods.
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/DataBlockBuilder.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/DataBlockBuilder.java
new file mode 100644
index 0000000000..0c456c1d85
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/DataBlockBuilder.java
@@ -0,0 +1,406 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.blocks;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.common.datatable.DataTableUtils;
+import org.apache.pinot.spi.utils.ArrayCopyUtils;
+import org.apache.pinot.spi.utils.BigDecimalUtils;
+import org.apache.pinot.spi.utils.ByteArray;
+
+
+public class DataBlockBuilder {
+ private final DataSchema _dataSchema;
+ private final BaseDataBlock.Type _blockType;
+ private final DataSchema.ColumnDataType[] _columnDataType;
+
+ private int[] _columnOffsets;
+ private int _rowSizeInBytes;
+ private int[] _cumulativeColumnOffsetSizeInBytes;
+ private int[] _columnSizeInBytes;
+
+ private int _numRows;
+ private int _numColumns;
+
+ private final Map<String, Map<String, Integer>> _dictionaryMap = new HashMap<>();
+ private final Map<String, Map<Integer, String>> _reverseDictionaryMap = new HashMap<>();
+ private final ByteArrayOutputStream _fixedSizeDataByteArrayOutputStream = new ByteArrayOutputStream();
+ private final ByteArrayOutputStream _variableSizeDataByteArrayOutputStream = new ByteArrayOutputStream();
+ private final DataOutputStream _variableSizeDataOutputStream =
+ new DataOutputStream(_variableSizeDataByteArrayOutputStream);
+
+
+ private ByteBuffer _currentRowDataByteBuffer;
+
+ private DataBlockBuilder(DataSchema dataSchema, BaseDataBlock.Type blockType) {
+ _dataSchema = dataSchema;
+ _columnDataType = dataSchema.getStoredColumnDataTypes();
+ _blockType = blockType;
+ _numColumns = dataSchema.size();
+ if (_blockType == BaseDataBlock.Type.COLUMNAR) {
+ _cumulativeColumnOffsetSizeInBytes = new int[_numColumns];
+ _columnSizeInBytes = new int[_numColumns];
+ DataBlockUtils.computeColumnSizeInBytes(_dataSchema, _columnSizeInBytes);
+ int cumulativeColumnOffset = 0;
+ for (int i = 0; i < _numColumns; i++) {
+ _cumulativeColumnOffsetSizeInBytes[i] = cumulativeColumnOffset;
+ cumulativeColumnOffset += _columnSizeInBytes[i] * _numRows;
+ }
+ } else if (_blockType == BaseDataBlock.Type.ROW) {
+ _columnOffsets = new int[_numColumns];
+ _rowSizeInBytes = DataTableUtils.computeColumnOffsets(dataSchema, _columnOffsets);
+ }
+ }
+
+ public static RowDataBlock buildFromRows(List<Object[]> rows, DataSchema dataSchema)
+ throws IOException {
+ DataBlockBuilder rowBuilder = new DataBlockBuilder(dataSchema, BaseDataBlock.Type.ROW);
+ rowBuilder._numRows = rows.size();
+ for (Object[] row : rows) {
+ ByteBuffer byteBuffer = ByteBuffer.allocate(rowBuilder._rowSizeInBytes);
+ for (int i = 0; i < rowBuilder._numColumns; i++) {
+ Object value = row[i];
+ switch (rowBuilder._columnDataType[i]) {
+ // Single-value column
+ case INT:
+ byteBuffer.putInt(((Number) value).intValue());
+ break;
+ case LONG:
+ byteBuffer.putLong(((Number) value).longValue());
+ break;
+ case FLOAT:
+ byteBuffer.putFloat(((Number) value).floatValue());
+ break;
+ case DOUBLE:
+ byteBuffer.putDouble(((Number) value).doubleValue());
+ break;
+ case BIG_DECIMAL:
+ setColumn(rowBuilder, byteBuffer, (BigDecimal) value);
+ break;
+ case STRING:
+ setColumn(rowBuilder, byteBuffer, i, (String) value);
+ break;
+ case BYTES:
+ setColumn(rowBuilder, byteBuffer, (ByteArray) value);
+ break;
+ case OBJECT:
+ setColumn(rowBuilder, byteBuffer, value);
+ break;
+ // Multi-value column
+ case BOOLEAN_ARRAY:
+ case INT_ARRAY:
+ setColumn(rowBuilder, byteBuffer, (int[]) value);
+ break;
+ case TIMESTAMP_ARRAY:
+ case LONG_ARRAY:
+ // LONG_ARRAY type covers INT_ARRAY and LONG_ARRAY
+ if (value instanceof int[]) {
+ int[] ints = (int[]) value;
+ int length = ints.length;
+ long[] longs = new long[length];
+ ArrayCopyUtils.copy(ints, longs, length);
+ setColumn(rowBuilder, byteBuffer, longs);
+ } else {
+ setColumn(rowBuilder, byteBuffer, (long[]) value);
+ }
+ break;
+ case FLOAT_ARRAY:
+ setColumn(rowBuilder, byteBuffer, (float[]) value);
+ break;
+ case DOUBLE_ARRAY:
+ // DOUBLE_ARRAY type covers INT_ARRAY, LONG_ARRAY, FLOAT_ARRAY and DOUBLE_ARRAY
+ if (value instanceof int[]) {
+ int[] ints = (int[]) value;
+ int length = ints.length;
+ double[] doubles = new double[length];
+ ArrayCopyUtils.copy(ints, doubles, length);
+ setColumn(rowBuilder, byteBuffer, doubles);
+ } else if (value instanceof long[]) {
+ long[] longs = (long[]) value;
+ int length = longs.length;
+ double[] doubles = new double[length];
+ ArrayCopyUtils.copy(longs, doubles, length);
+ setColumn(rowBuilder, byteBuffer, doubles);
+ } else if (value instanceof float[]) {
+ float[] floats = (float[]) value;
+ int length = floats.length;
+ double[] doubles = new double[length];
+ ArrayCopyUtils.copy(floats, doubles, length);
+ setColumn(rowBuilder, byteBuffer, doubles);
+ } else {
+ setColumn(rowBuilder, byteBuffer, (double[]) value);
+ }
+ break;
+ case BYTES_ARRAY:
+ case STRING_ARRAY:
+ setColumn(rowBuilder, byteBuffer, i, (String[]) value);
+ break;
+ default:
+ throw new IllegalStateException(String.format(
+ "Unsupported data type: %s for column: %s", rowBuilder._columnDataType[i],
+ rowBuilder._dataSchema.getColumnName(i)));
+ }
+ }
+ rowBuilder._fixedSizeDataByteArrayOutputStream.write(byteBuffer.array(), 0, byteBuffer.position());
+ }
+ return buildRowBlock(rowBuilder);
+ }
+
+ public static ColumnarDataBlock buildFromColumns(List<Object[]> columns, DataSchema dataSchema)
+ throws IOException {
+ DataBlockBuilder columnarBuilder = new DataBlockBuilder(dataSchema, BaseDataBlock.Type.COLUMNAR);
+ for (int i = 0; i < columns.size(); i++) {
+ Object[] column = columns.get(i);
+ columnarBuilder._numRows = column.length;
+ ByteBuffer byteBuffer = ByteBuffer.allocate(columnarBuilder._numRows * columnarBuilder._columnSizeInBytes[i]);
+ switch (columnarBuilder._columnDataType[i]) {
+ // Single-value column
+ case INT:
+ for (Object value : column) {
+ byteBuffer.putInt(((Number) value).intValue());
+ }
+ break;
+ case LONG:
+ for (Object value : column) {
+ byteBuffer.putLong(((Number) value).longValue());
+ }
+ break;
+ case FLOAT:
+ for (Object value : column) {
+ byteBuffer.putFloat(((Number) value).floatValue());
+ }
+ break;
+ case DOUBLE:
+ for (Object value : column) {
+ byteBuffer.putDouble(((Number) value).doubleValue());
+ }
+ break;
+ case BIG_DECIMAL:
+ for (Object value : column) {
+ setColumn(columnarBuilder, byteBuffer, (BigDecimal) value);
+ }
+ break;
+ case STRING:
+ for (Object value : column) {
+ setColumn(columnarBuilder, byteBuffer, i, (String) value);
+ }
+ break;
+ case BYTES:
+ for (Object value : column) {
+ setColumn(columnarBuilder, byteBuffer, (ByteArray) value);
+ }
+ break;
+ case OBJECT:
+ for (Object value : column) {
+ setColumn(columnarBuilder, byteBuffer, value);
+ }
+ break;
+ // Multi-value column
+ case BOOLEAN_ARRAY:
+ case INT_ARRAY:
+ for (Object value : column) {
+ setColumn(columnarBuilder, byteBuffer, (int[]) value);
+ }
+ break;
+ case TIMESTAMP_ARRAY:
+ case LONG_ARRAY:
+ for (Object value : column) {
+ if (value instanceof int[]) {
+ // LONG_ARRAY type covers INT_ARRAY and LONG_ARRAY
+ int[] ints = (int[]) value;
+ int length = ints.length;
+ long[] longs = new long[length];
+ ArrayCopyUtils.copy(ints, longs, length);
+ setColumn(columnarBuilder, byteBuffer, longs);
+ } else {
+ setColumn(columnarBuilder, byteBuffer, (long[]) value);
+ }
+ }
+ break;
+ case FLOAT_ARRAY:
+ for (Object value : column) {
+ setColumn(columnarBuilder, byteBuffer, (float[]) value);
+ }
+ break;
+ case DOUBLE_ARRAY:
+ for (Object value : column) {
+ // DOUBLE_ARRAY type covers INT_ARRAY, LONG_ARRAY, FLOAT_ARRAY and DOUBLE_ARRAY
+ if (value instanceof int[]) {
+ int[] ints = (int[]) value;
+ int length = ints.length;
+ double[] doubles = new double[length];
+ ArrayCopyUtils.copy(ints, doubles, length);
+ setColumn(columnarBuilder, byteBuffer, doubles);
+ } else if (value instanceof long[]) {
+ long[] longs = (long[]) value;
+ int length = longs.length;
+ double[] doubles = new double[length];
+ ArrayCopyUtils.copy(longs, doubles, length);
+ setColumn(columnarBuilder, byteBuffer, doubles);
+ } else if (value instanceof float[]) {
+ float[] floats = (float[]) value;
+ int length = floats.length;
+ double[] doubles = new double[length];
+ ArrayCopyUtils.copy(floats, doubles, length);
+ setColumn(columnarBuilder, byteBuffer, doubles);
+ } else {
+ setColumn(columnarBuilder, byteBuffer, (double[]) value);
+ }
+ }
+ break;
+ case BYTES_ARRAY:
+ case STRING_ARRAY:
+ for (Object value : column) {
+ setColumn(columnarBuilder, byteBuffer, i, (String[]) value);
+ }
+ break;
+ default:
+ throw new IllegalStateException(String.format(
+ "Unsupported data type: %s for column: %s", columnarBuilder._columnDataType[i],
+ columnarBuilder._dataSchema.getColumnName(i)));
+ }
+ columnarBuilder._fixedSizeDataByteArrayOutputStream.write(byteBuffer.array(), 0, byteBuffer.position());
+ }
+ return buildColumnarBlock(columnarBuilder);
+ }
+
+ private static RowDataBlock buildRowBlock(DataBlockBuilder builder) {
+ return new RowDataBlock(builder._numRows, builder._dataSchema, builder._reverseDictionaryMap,
+ builder._fixedSizeDataByteArrayOutputStream.toByteArray(),
+ builder._variableSizeDataByteArrayOutputStream.toByteArray());
+ }
+
+ private static ColumnarDataBlock buildColumnarBlock(DataBlockBuilder builder) {
+ return new ColumnarDataBlock(builder._numRows, builder._dataSchema, builder._reverseDictionaryMap,
+ builder._fixedSizeDataByteArrayOutputStream.toByteArray(),
+ builder._variableSizeDataByteArrayOutputStream.toByteArray());
+ }
+
+ private static void setColumn(DataBlockBuilder builder, ByteBuffer byteBuffer, BigDecimal value)
+ throws IOException {
+ byteBuffer.putInt(builder._variableSizeDataByteArrayOutputStream.size());
+ byte[] bytes = BigDecimalUtils.serialize(value);
+ byteBuffer.putInt(bytes.length);
+ builder._variableSizeDataByteArrayOutputStream.write(bytes);
+ }
+
+ private static void setColumn(DataBlockBuilder builder, ByteBuffer byteBuffer, int colId, String value) {
+ String columnName = builder._dataSchema.getColumnName(colId);
+ Map<String, Integer> dictionary = builder._dictionaryMap.get(columnName);
+ if (dictionary == null) {
+ dictionary = new HashMap<>();
+ builder._dictionaryMap.put(columnName, dictionary);
+ builder._reverseDictionaryMap.put(columnName, new HashMap<>());
+ }
+ Integer dictId = dictionary.get(value);
+ if (dictId == null) {
+ dictId = dictionary.size();
+ dictionary.put(value, dictId);
+ builder._reverseDictionaryMap.get(columnName).put(dictId, value);
+ }
+ byteBuffer.putInt(dictId);
+ }
+
+ private static void setColumn(DataBlockBuilder builder, ByteBuffer byteBuffer, ByteArray value)
+ throws IOException {
+ byteBuffer.putInt(builder._variableSizeDataByteArrayOutputStream.size());
+ byte[] bytes = value.getBytes();
+ byteBuffer.putInt(bytes.length);
+ builder._variableSizeDataByteArrayOutputStream.write(bytes);
+ }
+
+ private static void setColumn(DataBlockBuilder builder, ByteBuffer byteBuffer, Object value)
+ throws IOException {
+ byteBuffer.putInt(builder._variableSizeDataByteArrayOutputStream.size());
+ int objectTypeValue = ObjectSerDeUtils.ObjectType.getObjectType(value).getValue();
+ byte[] bytes = ObjectSerDeUtils.serialize(value, objectTypeValue);
+ byteBuffer.putInt(bytes.length);
+ builder._variableSizeDataOutputStream.writeInt(objectTypeValue);
+ builder._variableSizeDataByteArrayOutputStream.write(bytes);
+ }
+
+ private static void setColumn(DataBlockBuilder builder, ByteBuffer byteBuffer, int[] values)
+ throws IOException {
+ byteBuffer.putInt(builder._variableSizeDataByteArrayOutputStream.size());
+ byteBuffer.putInt(values.length);
+ for (int value : values) {
+ builder._variableSizeDataOutputStream.writeInt(value);
+ }
+ }
+
+ private static void setColumn(DataBlockBuilder builder, ByteBuffer byteBuffer, long[] values)
+ throws IOException {
+ byteBuffer.putInt(builder._variableSizeDataByteArrayOutputStream.size());
+ byteBuffer.putInt(values.length);
+ for (long value : values) {
+ builder._variableSizeDataOutputStream.writeLong(value);
+ }
+ }
+
+ private static void setColumn(DataBlockBuilder builder, ByteBuffer byteBuffer, float[] values)
+ throws IOException {
+ byteBuffer.putInt(builder._variableSizeDataByteArrayOutputStream.size());
+ byteBuffer.putInt(values.length);
+ for (float value : values) {
+ builder._variableSizeDataOutputStream.writeFloat(value);
+ }
+ }
+
+ private static void setColumn(DataBlockBuilder builder, ByteBuffer byteBuffer, double[] values)
+ throws IOException {
+ byteBuffer.putInt(builder._variableSizeDataByteArrayOutputStream.size());
+ byteBuffer.putInt(values.length);
+ for (double value : values) {
+ builder._variableSizeDataOutputStream.writeDouble(value);
+ }
+ }
+
+ private static void setColumn(DataBlockBuilder builder, ByteBuffer byteBuffer, int colId, String[] values)
+ throws IOException {
+ byteBuffer.putInt(builder._variableSizeDataByteArrayOutputStream.size());
+ byteBuffer.putInt(values.length);
+
+ String columnName = builder._dataSchema.getColumnName(colId);
+ Map<String, Integer> dictionary = builder._dictionaryMap.get(columnName);
+ if (dictionary == null) {
+ dictionary = new HashMap<>();
+ builder._dictionaryMap.put(columnName, dictionary);
+ builder._reverseDictionaryMap.put(columnName, new HashMap<>());
+ }
+
+ for (String value : values) {
+ Integer dictId = dictionary.get(value);
+ if (dictId == null) {
+ dictId = dictionary.size();
+ dictionary.put(value, dictId);
+ builder._reverseDictionaryMap.get(columnName).put(dictId, value);
+ }
+ builder._variableSizeDataOutputStream.writeInt(dictId);
+ }
+ }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/DataBlockUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/DataBlockUtils.java
new file mode 100644
index 0000000000..690be1353a
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/DataBlockUtils.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.blocks;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.response.ProcessingException;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataTable;
+
+
+public final class DataBlockUtils {
+ protected static final int VERSION_TYPE_SHIFT = 5;
+ private DataBlockUtils() {
+ // do not instantiate.
+ }
+
+ private static final DataSchema EMPTY_SCHEMA = new DataSchema(new String[0], new DataSchema.ColumnDataType[0]);
+ private static final MetadataBlock EOS_DATA_BLOCK = new MetadataBlock(EMPTY_SCHEMA);
+ static {
+ EOS_DATA_BLOCK._metadata.put(DataTable.MetadataKey.TABLE.getName(), "END_OF_STREAM");
+ }
+ private static final TransferableBlock EOS_TRANSFERABLE_BLOCK = new TransferableBlock(EOS_DATA_BLOCK);
+
+ public static TransferableBlock getEndOfStreamTransferableBlock() {
+ return EOS_TRANSFERABLE_BLOCK;
+ }
+
+ public static MetadataBlock getEndOfStreamDataBlock() {
+ return EOS_DATA_BLOCK;
+ }
+
+ public static MetadataBlock getErrorDataBlock(Exception e) {
+ MetadataBlock errorBlock = new MetadataBlock(EMPTY_SCHEMA);
+ errorBlock._metadata.put(DataTable.MetadataKey.TABLE.getName(), "ERROR");
+ if (e instanceof ProcessingException) {
+ errorBlock.addException(((ProcessingException) e).getErrorCode(), e.getMessage());
+ } else {
+ errorBlock.addException(QueryException.UNKNOWN_ERROR_CODE, e.getMessage());
+ }
+ return errorBlock;
+ }
+
+ public static TransferableBlock getErrorTransferableBlock(Exception e) {
+ return new TransferableBlock(getErrorDataBlock(e));
+ }
+
+ public static MetadataBlock getEmptyDataBlock(DataSchema dataSchema) {
+ return dataSchema == null ? EOS_DATA_BLOCK : new MetadataBlock(dataSchema);
+ }
+
+ public static TransferableBlock getEmptyTransferableBlock(DataSchema dataSchema) {
+ return new TransferableBlock(getEmptyDataBlock(dataSchema));
+ }
+
+ public static boolean isEndOfStream(TransferableBlock transferableBlock) {
+ return transferableBlock.getType().equals(BaseDataBlock.Type.METADATA)
+ && "END_OF_STREAM".equals(transferableBlock.getDataBlock().getMetadata()
+ .get(DataTable.MetadataKey.TABLE.getName()));
+ }
+
+ public static BaseDataBlock getDataBlock(ByteBuffer byteBuffer)
+ throws IOException {
+ int versionType = byteBuffer.getInt();
+ int version = versionType & ((1 << VERSION_TYPE_SHIFT) - 1);
+ BaseDataBlock.Type type = BaseDataBlock.Type.fromOrdinal(versionType >> VERSION_TYPE_SHIFT);
+ switch (type) {
+ case COLUMNAR:
+ return new ColumnarDataBlock(byteBuffer);
+ case ROW:
+ return new RowDataBlock(byteBuffer);
+ case METADATA:
+ return new MetadataBlock(byteBuffer);
+ default:
+ throw new UnsupportedOperationException("Unsupported data table version: " + version + " with type: " + type);
+ }
+ }
+
+ /**
+ * Given a {@link DataSchema}, compute each column's offset and fill them into the passed in array, then return the
+ * row size in bytes.
+ *
+ * @param dataSchema data schema.
+ * @param columnOffsets array of column offsets.
+ * @return row size in bytes.
+ */
+ public static int computeColumnOffsets(DataSchema dataSchema, int[] columnOffsets) {
+ int numColumns = columnOffsets.length;
+ assert numColumns == dataSchema.size();
+
+ DataSchema.ColumnDataType[] storedColumnDataTypes = dataSchema.getStoredColumnDataTypes();
+ int rowSizeInBytes = 0;
+ for (int i = 0; i < numColumns; i++) {
+ columnOffsets[i] = rowSizeInBytes;
+ switch (storedColumnDataTypes[i]) {
+ case INT:
+ rowSizeInBytes += 4;
+ break;
+ case LONG:
+ rowSizeInBytes += 8;
+ break;
+ case FLOAT:
+ rowSizeInBytes += 4;
+ break;
+ case DOUBLE:
+ rowSizeInBytes += 8;
+ break;
+ case STRING:
+ rowSizeInBytes += 4;
+ break;
+ // Object and array. (POSITION|LENGTH)
+ default:
+ rowSizeInBytes += 8;
+ break;
+ }
+ }
+
+ return rowSizeInBytes;
+ }
+
+ /**
+ * Given a {@link DataSchema}, compute each column's size and fill them into the passed in array.
+ *
+ * @param dataSchema data schema.
+ * @param columnSizes array of column size.
+ * @return row size in bytes.
+ */
+ public static void computeColumnSizeInBytes(DataSchema dataSchema, int[] columnSizes) {
+ int numColumns = columnSizes.length;
+ assert numColumns == dataSchema.size();
+
+ DataSchema.ColumnDataType[] storedColumnDataTypes = dataSchema.getStoredColumnDataTypes();
+ for (int i = 0; i < numColumns; i++) {
+ switch (storedColumnDataTypes[i]) {
+ case INT:
+ columnSizes[i] = 4;
+ break;
+ case LONG:
+ columnSizes[i] = 8;
+ break;
+ case FLOAT:
+ columnSizes[i] = 4;
+ break;
+ case DOUBLE:
+ columnSizes[i] = 8;
+ break;
+ case STRING:
+ columnSizes[i] = 4;
+ break;
+ // Object and array. (POSITION|LENGTH)
+ default:
+ columnSizes[i] = 8;
+ break;
+ }
+ }
+ }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/DataTableBlockUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/DataTableBlockUtils.java
deleted file mode 100644
index c429e2e014..0000000000
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/DataTableBlockUtils.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.query.runtime.blocks;
-
-import org.apache.pinot.common.exception.QueryException;
-import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.common.utils.DataTable;
-import org.apache.pinot.core.common.datatable.DataTableBuilder;
-
-
-public final class DataTableBlockUtils {
- private DataTableBlockUtils() {
- // do not instantiate.
- }
-
- // used to indicate a datatable block status
- private static final DataSchema EMPTY_SCHEMA = new DataSchema(new String[0], new DataSchema.ColumnDataType[0]);
- private static final DataTable EMPTY_DATATABLE = new DataTableBuilder(EMPTY_SCHEMA).build();
- private static final DataTableBlock END_OF_STREAM_DATATABLE_BLOCK = new DataTableBlock(EMPTY_DATATABLE);
-
- public static DataTableBlock getEndOfStreamDataTableBlock() {
- return END_OF_STREAM_DATATABLE_BLOCK;
- }
-
- public static DataTable getEndOfStreamDataTable() {
- return EMPTY_DATATABLE;
- }
-
- public static DataTable getErrorDataTable(Exception e) {
- DataTable errorDataTable = new DataTableBuilder(EMPTY_SCHEMA).build();
- errorDataTable.addException(QueryException.UNKNOWN_ERROR_CODE, e.getMessage());
- return errorDataTable;
- }
-
- public static DataTableBlock getErrorDatatableBlock(Exception e) {
- return new DataTableBlock(getErrorDataTable(e));
- }
-
- public static DataTable getEmptyDataTable(DataSchema dataSchema) {
- if (dataSchema != null) {
- return new DataTableBuilder(dataSchema).build();
- } else {
- return EMPTY_DATATABLE;
- }
- }
-
- public static DataTableBlock getEmptyDataTableBlock(DataSchema dataSchema) {
- return new DataTableBlock(getEmptyDataTable(dataSchema));
- }
-
- public static boolean isEndOfStream(DataTableBlock dataTableBlock) {
- DataSchema dataSchema = dataTableBlock.getDataTable().getDataSchema();
- return dataSchema.getColumnNames().length == 0 && dataSchema.getColumnDataTypes().length == 0;
- }
-}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/MetadataBlock.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/MetadataBlock.java
new file mode 100644
index 0000000000..a3ffc7a126
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/MetadataBlock.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.blocks;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import org.apache.pinot.common.utils.DataSchema;
+
+
+/**
+ * Wrapper for row-wise data table. It stores data in row-major format.
+ */
+public class MetadataBlock extends BaseDataBlock {
+ private static final int VERSION = 1;
+
+ public MetadataBlock(DataSchema dataSchema) {
+ super(0, dataSchema, Collections.emptyMap(), new byte[]{0}, new byte[]{0});
+ }
+
+ public MetadataBlock(ByteBuffer byteBuffer)
+ throws IOException {
+ super(byteBuffer);
+ }
+
+ @Override
+ protected int getDataBlockVersionType() {
+ return VERSION + (Type.METADATA.ordinal() << DataBlockUtils.VERSION_TYPE_SHIFT);
+ }
+
+ @Override
+ protected void positionCursorInFixSizedBuffer(int rowId, int colId) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected int positionCursorInVariableBuffer(int rowId, int colId) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public MetadataBlock toMetadataOnlyDataTable() {
+ return this;
+ }
+
+ @Override
+ public MetadataBlock toDataOnlyDataTable() {
+ return new MetadataBlock(_dataSchema);
+ }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/RowDataBlock.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/RowDataBlock.java
new file mode 100644
index 0000000000..f8520c54f6
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/RowDataBlock.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.blocks;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import org.apache.pinot.common.utils.DataSchema;
+
+
+/**
+ * Wrapper for row-wise data table. It stores data in row-major format.
+ */
+public class RowDataBlock extends BaseDataBlock {
+ private static final int VERSION = 1;
+ protected int[] _columnOffsets;
+ protected int _rowSizeInBytes;
+
+ public RowDataBlock() {
+ super();
+ }
+
+ public RowDataBlock(int numRows, DataSchema dataSchema, Map<String, Map<Integer, String>> dictionaryMap,
+ byte[] fixedSizeDataBytes, byte[] variableSizeDataBytes) {
+ super(numRows, dataSchema, dictionaryMap, fixedSizeDataBytes, variableSizeDataBytes);
+ computeBlockObjectConstants();
+ }
+
+ public RowDataBlock(ByteBuffer byteBuffer)
+ throws IOException {
+ super(byteBuffer);
+ computeBlockObjectConstants();
+ }
+
+ protected void computeBlockObjectConstants() {
+ if (_dataSchema != null) {
+ _columnOffsets = new int[_numColumns];
+ _rowSizeInBytes = DataBlockUtils.computeColumnOffsets(_dataSchema, _columnOffsets);
+ }
+ }
+
+ @Override
+ protected int getDataBlockVersionType() {
+ return VERSION + (Type.ROW.ordinal() << DataBlockUtils.VERSION_TYPE_SHIFT);
+ }
+
+ @Override
+ protected void positionCursorInFixSizedBuffer(int rowId, int colId) {
+ int position = rowId * _rowSizeInBytes + _columnOffsets[colId];
+ _fixedSizeData.position(position);
+ }
+
+ @Override
+ protected int positionCursorInVariableBuffer(int rowId, int colId) {
+ positionCursorInFixSizedBuffer(rowId, colId);
+ _variableSizeData.position(_fixedSizeData.getInt());
+ return _fixedSizeData.getInt();
+ }
+
+ @Override
+ public RowDataBlock toMetadataOnlyDataTable() {
+ RowDataBlock metadataOnlyDataTable = new RowDataBlock();
+ metadataOnlyDataTable._metadata.putAll(_metadata);
+ metadataOnlyDataTable._errCodeToExceptionMap.putAll(_errCodeToExceptionMap);
+ return metadataOnlyDataTable;
+ }
+
+ @Override
+ public RowDataBlock toDataOnlyDataTable() {
+ return new RowDataBlock(_numRows, _dataSchema, _dictionaryMap, _fixedSizeDataBytes, _variableSizeDataBytes);
+ }
+
+ // TODO: add whole-row access methods.
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/DataTableBlock.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
similarity index 70%
rename from pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/DataTableBlock.java
rename to pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
index a8cc63c624..9284011f06 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/DataTableBlock.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
@@ -19,31 +19,39 @@
package org.apache.pinot.query.runtime.blocks;
import java.io.IOException;
-import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.core.common.Block;
import org.apache.pinot.core.common.BlockDocIdSet;
import org.apache.pinot.core.common.BlockDocIdValueSet;
import org.apache.pinot.core.common.BlockMetadata;
import org.apache.pinot.core.common.BlockValSet;
-import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
- * A {@code DataTableBlock} is a row-based data block backed by a {@link DataTable}.
+ * A {@code TransferableBlock} is a wrapper around {@link BaseDataBlock} for transferring data using
+ * {@link org.apache.pinot.common.proto.Mailbox}.
*/
-public class DataTableBlock implements Block {
- private static final Logger LOGGER = LoggerFactory.getLogger(InstanceResponseBlock.class);
+public class TransferableBlock implements Block {
- private DataTable _dataTable;
+ private BaseDataBlock _dataBlock;
+ private BaseDataBlock.Type _type;
- public DataTableBlock(DataTable dataTable) {
- _dataTable = dataTable;
+ public TransferableBlock(BaseDataBlock dataBlock) {
+ _dataBlock = dataBlock;
+ _type = dataBlock instanceof ColumnarDataBlock ? BaseDataBlock.Type.COLUMNAR
+ : dataBlock instanceof RowDataBlock ? BaseDataBlock.Type.ROW : BaseDataBlock.Type.METADATA;
}
- public DataTable getDataTable() {
- return _dataTable;
+ public BaseDataBlock getDataBlock() {
+ return _dataBlock;
+ }
+
+ public BaseDataBlock.Type getType() {
+ return _type;
+ }
+
+ public byte[] toBytes()
+ throws IOException {
+ return _dataBlock.toBytes();
}
@Override
@@ -65,9 +73,4 @@ public class DataTableBlock implements Block {
public BlockMetadata getMetadata() {
throw new UnsupportedOperationException();
}
-
- public byte[] toBytes()
- throws IOException {
- return _dataTable.toBytes();
- }
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
index 9a66bf86c1..d8ec43e040 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
@@ -36,8 +36,8 @@ import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
import org.apache.pinot.query.planner.stage.MailboxSendNode;
import org.apache.pinot.query.planner.stage.ProjectNode;
import org.apache.pinot.query.planner.stage.StageNode;
-import org.apache.pinot.query.runtime.blocks.DataTableBlock;
-import org.apache.pinot.query.runtime.blocks.DataTableBlockUtils;
+import org.apache.pinot.query.runtime.blocks.DataBlockUtils;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.operator.HashJoinOperator;
import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
@@ -83,12 +83,12 @@ public class WorkerQueryExecutor {
ExecutorService executorService) {
long requestId = Long.parseLong(requestMetadataMap.get("REQUEST_ID"));
StageNode stageRoot = queryRequest.getStageRoot();
- BaseOperator<DataTableBlock> rootOperator = getOperator(requestId, stageRoot, queryRequest.getMetadataMap());
+ BaseOperator<TransferableBlock> rootOperator = getOperator(requestId, stageRoot, queryRequest.getMetadataMap());
executorService.submit(new TraceRunnable() {
@Override
public void runJob() {
ThreadTimer executionThreadTimer = new ThreadTimer();
- while (!DataTableBlockUtils.isEndOfStream(rootOperator.nextBlock())) {
+ while (!DataBlockUtils.isEndOfStream(rootOperator.nextBlock())) {
LOGGER.debug("Result Block acquired");
}
LOGGER.info("Execution time:" + executionThreadTimer.getThreadTimeNs());
@@ -97,7 +97,7 @@ public class WorkerQueryExecutor {
}
// TODO: split this PhysicalPlanner into a separate module
- private BaseOperator<DataTableBlock> getOperator(long requestId, StageNode stageNode,
+ private BaseOperator<TransferableBlock> getOperator(long requestId, StageNode stageNode,
Map<Integer, StageMetadata> metadataMap) {
// TODO: optimize this into a framework. (physical planner)
if (stageNode instanceof MailboxReceiveNode) {
@@ -107,15 +107,15 @@ public class WorkerQueryExecutor {
requestId, receiveNode.getSenderStageId());
} else if (stageNode instanceof MailboxSendNode) {
MailboxSendNode sendNode = (MailboxSendNode) stageNode;
- BaseOperator<DataTableBlock> nextOperator = getOperator(requestId, sendNode.getInputs().get(0), metadataMap);
+ BaseOperator<TransferableBlock> nextOperator = getOperator(requestId, sendNode.getInputs().get(0), metadataMap);
StageMetadata receivingStageMetadata = metadataMap.get(sendNode.getReceiverStageId());
return new MailboxSendOperator(_mailboxService, nextOperator, receivingStageMetadata.getServerInstances(),
sendNode.getExchangeType(), sendNode.getPartitionKeySelector(), _hostName, _port, requestId,
sendNode.getStageId());
} else if (stageNode instanceof JoinNode) {
JoinNode joinNode = (JoinNode) stageNode;
- BaseOperator<DataTableBlock> leftOperator = getOperator(requestId, joinNode.getInputs().get(0), metadataMap);
- BaseOperator<DataTableBlock> rightOperator = getOperator(requestId, joinNode.getInputs().get(1), metadataMap);
+ BaseOperator<TransferableBlock> leftOperator = getOperator(requestId, joinNode.getInputs().get(0), metadataMap);
+ BaseOperator<TransferableBlock> rightOperator = getOperator(requestId, joinNode.getInputs().get(1), metadataMap);
return new HashJoinOperator(leftOperator, rightOperator, joinNode.getCriteria());
} else if (stageNode instanceof FilterNode) {
throw new UnsupportedOperationException("Unsupported!");
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
index 238c1d807b..ff99dae5ed 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
@@ -24,14 +24,15 @@ import java.util.HashMap;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.operator.BaseOperator;
import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
import org.apache.pinot.query.planner.partitioning.KeySelector;
import org.apache.pinot.query.planner.stage.JoinNode;
-import org.apache.pinot.query.runtime.blocks.DataTableBlock;
-import org.apache.pinot.query.runtime.blocks.DataTableBlockUtils;
+import org.apache.pinot.query.runtime.blocks.BaseDataBlock;
+import org.apache.pinot.query.runtime.blocks.DataBlockBuilder;
+import org.apache.pinot.query.runtime.blocks.DataBlockUtils;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
/**
@@ -42,12 +43,12 @@ import org.apache.pinot.query.runtime.blocks.DataTableBlockUtils;
*
* <p>For each of the data block received from the left table, it will generate a joint data block.
*/
-public class HashJoinOperator extends BaseOperator<DataTableBlock> {
+public class HashJoinOperator extends BaseOperator<TransferableBlock> {
private static final String EXPLAIN_NAME = "BROADCAST_JOIN";
private final HashMap<Object, List<Object[]>> _broadcastHashTable;
- private final BaseOperator<DataTableBlock> _leftTableOperator;
- private final BaseOperator<DataTableBlock> _rightTableOperator;
+ private final BaseOperator<TransferableBlock> _leftTableOperator;
+ private final BaseOperator<TransferableBlock> _rightTableOperator;
private DataSchema _leftTableSchema;
private DataSchema _rightTableSchema;
@@ -56,8 +57,8 @@ public class HashJoinOperator extends BaseOperator<DataTableBlock> {
private KeySelector<Object[], Object> _leftKeySelector;
private KeySelector<Object[], Object> _rightKeySelector;
- public HashJoinOperator(BaseOperator<DataTableBlock> leftTableOperator,
- BaseOperator<DataTableBlock> rightTableOperator, List<JoinNode.JoinClause> criteria) {
+ public HashJoinOperator(BaseOperator<TransferableBlock> leftTableOperator,
+ BaseOperator<TransferableBlock> rightTableOperator, List<JoinNode.JoinClause> criteria) {
// TODO: this assumes right table is broadcast.
_leftKeySelector = criteria.get(0).getLeftJoinKeySelector();
_rightKeySelector = criteria.get(0).getRightJoinKeySelector();
@@ -80,25 +81,25 @@ public class HashJoinOperator extends BaseOperator<DataTableBlock> {
}
@Override
- protected DataTableBlock getNextBlock() {
+ protected TransferableBlock getNextBlock() {
buildBroadcastHashTable();
try {
- return new DataTableBlock(buildJoinedDataTable(_leftTableOperator.nextBlock()));
+ return new TransferableBlock(buildJoinedDataBlock(_leftTableOperator.nextBlock()));
} catch (Exception e) {
- return DataTableBlockUtils.getErrorDatatableBlock(e);
+ return DataBlockUtils.getErrorTransferableBlock(e);
}
}
private void buildBroadcastHashTable() {
if (!_isHashTableBuilt) {
- DataTableBlock rightBlock = _rightTableOperator.nextBlock();
- while (!DataTableBlockUtils.isEndOfStream(rightBlock)) {
- DataTable dataTable = rightBlock.getDataTable();
- _rightTableSchema = dataTable.getDataSchema();
- int numRows = dataTable.getNumberOfRows();
+ TransferableBlock rightBlock = _rightTableOperator.nextBlock();
+ while (!DataBlockUtils.isEndOfStream(rightBlock)) {
+ BaseDataBlock dataBlock = rightBlock.getDataBlock();
+ _rightTableSchema = dataBlock.getDataSchema();
+ int numRows = dataBlock.getNumberOfRows();
// put all the rows into corresponding hash collections keyed by the key selector function.
for (int rowId = 0; rowId < numRows; rowId++) {
- Object[] objects = SelectionOperatorUtils.extractRowFromDataTable(dataTable, rowId);
+ Object[] objects = SelectionOperatorUtils.extractRowFromDataTable(dataBlock, rowId);
List<Object[]> hashCollection =
_broadcastHashTable.computeIfAbsent(_rightKeySelector.getKey(objects), k -> new ArrayList<>());
hashCollection.add(objects);
@@ -109,25 +110,25 @@ public class HashJoinOperator extends BaseOperator<DataTableBlock> {
}
}
- private DataTable buildJoinedDataTable(DataTableBlock block)
+ private BaseDataBlock buildJoinedDataBlock(TransferableBlock block)
throws Exception {
- if (DataTableBlockUtils.isEndOfStream(block)) {
- return DataTableBlockUtils.getEndOfStreamDataTable();
+ if (DataBlockUtils.isEndOfStream(block)) {
+ return DataBlockUtils.getEndOfStreamDataBlock();
}
List<Object[]> rows = new ArrayList<>();
- DataTable dataTable = block.getDataTable();
- _leftTableSchema = dataTable.getDataSchema();
+ BaseDataBlock dataBlock = block.getDataBlock();
+ _leftTableSchema = dataBlock.getDataSchema();
_resultRowSize = _leftTableSchema.size() + _rightTableSchema.size();
- int numRows = dataTable.getNumberOfRows();
+ int numRows = dataBlock.getNumberOfRows();
for (int rowId = 0; rowId < numRows; rowId++) {
- Object[] leftRow = SelectionOperatorUtils.extractRowFromDataTable(dataTable, rowId);
+ Object[] leftRow = SelectionOperatorUtils.extractRowFromDataTable(dataBlock, rowId);
List<Object[]> hashCollection =
_broadcastHashTable.getOrDefault(_leftKeySelector.getKey(leftRow), Collections.emptyList());
for (Object[] rightRow : hashCollection) {
rows.add(joinRow(leftRow, rightRow));
}
}
- return SelectionOperatorUtils.getDataTableFromRows(rows, computeSchema());
+ return DataBlockBuilder.buildFromRows(rows, computeSchema());
}
private Object[] joinRow(Object[] leftRow, Object[] rightRow) {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
index 7b396b13cc..b84d8f40f0 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
@@ -24,16 +24,15 @@ import javax.annotation.Nullable;
import org.apache.calcite.rel.RelDistribution;
import org.apache.pinot.common.proto.Mailbox;
import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.core.common.Operator;
-import org.apache.pinot.core.common.datatable.DataTableFactory;
import org.apache.pinot.core.operator.BaseOperator;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.mailbox.ReceivingMailbox;
import org.apache.pinot.query.mailbox.StringMailboxIdentifier;
-import org.apache.pinot.query.runtime.blocks.DataTableBlock;
-import org.apache.pinot.query.runtime.blocks.DataTableBlockUtils;
+import org.apache.pinot.query.runtime.blocks.BaseDataBlock;
+import org.apache.pinot.query.runtime.blocks.DataBlockUtils;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,7 +41,7 @@ import org.slf4j.LoggerFactory;
* This {@code MailboxReceiveOperator} receives data from a {@link ReceivingMailbox} and serve it out from the
* {@link BaseOperator#getNextBlock()} API.
*/
-public class MailboxReceiveOperator extends BaseOperator<DataTableBlock> {
+public class MailboxReceiveOperator extends BaseOperator<TransferableBlock> {
private static final Logger LOGGER = LoggerFactory.getLogger(MailboxReceiveOperator.class);
private static final String EXPLAIN_NAME = "MAILBOX_RECEIVE";
private static final long DEFAULT_TIMEOUT_NANO = 10_000_000_000L;
@@ -80,7 +79,7 @@ public class MailboxReceiveOperator extends BaseOperator<DataTableBlock> {
}
@Override
- protected DataTableBlock getNextBlock() {
+ protected TransferableBlock getNextBlock() {
// TODO: do a round robin check against all MailboxContentStreamObservers and find which one that has data.
boolean hasOpenedMailbox = true;
DataSchema dataSchema = null;
@@ -99,10 +98,10 @@ public class MailboxReceiveOperator extends BaseOperator<DataTableBlock> {
if (mailboxContent != null) {
ByteBuffer byteBuffer = mailboxContent.getPayload().asReadOnlyByteBuffer();
if (byteBuffer.hasRemaining()) {
- DataTable dataTable = DataTableFactory.getDataTable(byteBuffer);
- if (dataTable.getNumberOfRows() > 0) {
+ BaseDataBlock dataBlock = DataBlockUtils.getDataBlock(byteBuffer);
+ if (dataBlock.getNumberOfRows() > 0) {
// here we only return data table block when it is not empty.
- return new DataTableBlock(dataTable);
+ return new TransferableBlock(dataBlock);
}
}
}
@@ -117,7 +116,7 @@ public class MailboxReceiveOperator extends BaseOperator<DataTableBlock> {
}
// TODO: we need to at least return one data table with schema if there's no error.
// we need to condition this on whether there's already things being returned or not.
- return DataTableBlockUtils.getEndOfStreamDataTableBlock();
+ return DataBlockUtils.getEndOfStreamTransferableBlock();
}
public RelDistribution.Type getExchangeType() {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index 51f24c97d4..8629663b60 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -37,16 +37,18 @@ import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.mailbox.SendingMailbox;
import org.apache.pinot.query.mailbox.StringMailboxIdentifier;
import org.apache.pinot.query.planner.partitioning.KeySelector;
-import org.apache.pinot.query.runtime.blocks.DataTableBlock;
-import org.apache.pinot.query.runtime.blocks.DataTableBlockUtils;
+import org.apache.pinot.query.runtime.blocks.BaseDataBlock;
+import org.apache.pinot.query.runtime.blocks.DataBlockBuilder;
+import org.apache.pinot.query.runtime.blocks.DataBlockUtils;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * This {@code MailboxSendOperator} is created to send {@link DataTableBlock}s to the receiving end.
+ * This {@code MailboxSendOperator} is created to send {@link TransferableBlock}s to the receiving end.
*/
-public class MailboxSendOperator extends BaseOperator<DataTableBlock> {
+public class MailboxSendOperator extends BaseOperator<TransferableBlock> {
private static final Logger LOGGER = LoggerFactory.getLogger(MailboxSendOperator.class);
private static final String EXPLAIN_NAME = "MAILBOX_SEND";
private static final Set<RelDistribution.Type> SUPPORTED_EXCHANGE_TYPE =
@@ -61,11 +63,11 @@ public class MailboxSendOperator extends BaseOperator<DataTableBlock> {
private final long _jobId;
private final int _stageId;
private final MailboxService<Mailbox.MailboxContent> _mailboxService;
- private BaseOperator<DataTableBlock> _dataTableBlockBaseOperator;
- private DataTable _dataTable;
+ private BaseOperator<TransferableBlock> _dataTableBlockBaseOperator;
+ private BaseDataBlock _dataTable;
public MailboxSendOperator(MailboxService<Mailbox.MailboxContent> mailboxService,
- BaseOperator<DataTableBlock> dataTableBlockBaseOperator, List<ServerInstance> receivingStageInstances,
+ BaseOperator<TransferableBlock> dataTableBlockBaseOperator, List<ServerInstance> receivingStageInstances,
RelDistribution.Type exchangeType, KeySelector<Object[], Object> keySelector, String hostName, int port,
long jobId, int stageId) {
_mailboxService = mailboxService;
@@ -86,7 +88,7 @@ public class MailboxSendOperator extends BaseOperator<DataTableBlock> {
* we create a {@link org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl} that can handle the
* creation of MailboxSendOperator we should not use this API.
*/
- public MailboxSendOperator(MailboxService<Mailbox.MailboxContent> mailboxService, DataTable dataTable,
+ public MailboxSendOperator(MailboxService<Mailbox.MailboxContent> mailboxService, BaseDataBlock dataTable,
List<ServerInstance> receivingStageInstances, RelDistribution.Type exchangeType,
KeySelector<Object[], Object> keySelector, String hostName, int port, long jobId, int stageId) {
_mailboxService = mailboxService;
@@ -113,14 +115,14 @@ public class MailboxSendOperator extends BaseOperator<DataTableBlock> {
}
@Override
- protected DataTableBlock getNextBlock() {
- DataTable dataTable;
- DataTableBlock dataTableBlock = null;
+ protected TransferableBlock getNextBlock() {
+ BaseDataBlock dataTable;
+ TransferableBlock transferableBlock = null;
boolean isEndOfStream;
if (_dataTableBlockBaseOperator != null) {
- dataTableBlock = _dataTableBlockBaseOperator.nextBlock();
- dataTable = dataTableBlock.getDataTable();
- isEndOfStream = DataTableBlockUtils.isEndOfStream(dataTableBlock);
+ transferableBlock = _dataTableBlockBaseOperator.nextBlock();
+ dataTable = transferableBlock.getDataBlock();
+ isEndOfStream = DataBlockUtils.isEndOfStream(transferableBlock);
} else {
dataTable = _dataTable;
isEndOfStream = true;
@@ -137,7 +139,7 @@ public class MailboxSendOperator extends BaseOperator<DataTableBlock> {
// we no longer need to send data to the rest of the receiving instances, but we still need to transfer
// the dataTable over indicating that we are a potential sender. thus next time a random server is selected
// it might still be useful.
- dataTable = DataTableBlockUtils.getEmptyDataTable(dataTable.getDataSchema());
+ dataTable = DataBlockUtils.getEmptyDataBlock(dataTable.getDataSchema());
}
break;
case BROADCAST_DISTRIBUTED:
@@ -147,7 +149,7 @@ public class MailboxSendOperator extends BaseOperator<DataTableBlock> {
break;
case HASH_DISTRIBUTED:
// TODO: ensure that server instance list is sorted using same function in sender.
- List<DataTable> dataTableList = constructPartitionedDataBlock(dataTable, _keySelector,
+ List<BaseDataBlock> dataTableList = constructPartitionedDataBlock(dataTable, _keySelector,
_receivingStageInstances.size());
for (int i = 0; i < _receivingStageInstances.size(); i++) {
sendDataTableBlock(_receivingStageInstances.get(i), dataTableList.get(i), isEndOfStream);
@@ -162,10 +164,10 @@ public class MailboxSendOperator extends BaseOperator<DataTableBlock> {
} catch (Exception e) {
LOGGER.error("Exception occur while sending data via mailbox", e);
}
- return dataTableBlock;
+ return transferableBlock;
}
- private static List<DataTable> constructPartitionedDataBlock(DataTable dataTable,
+ private static List<BaseDataBlock> constructPartitionedDataBlock(DataTable dataTable,
KeySelector<Object[], Object> keySelector, int partitionSize)
throws Exception {
List<List<Object[]>> temporaryRows = new ArrayList<>(partitionSize);
@@ -178,10 +180,10 @@ public class MailboxSendOperator extends BaseOperator<DataTableBlock> {
// TODO: support other partitioning algorithm
temporaryRows.get(hashToIndex(key, partitionSize)).add(row);
}
- List<DataTable> dataTableList = new ArrayList<>(partitionSize);
+ List<BaseDataBlock> dataTableList = new ArrayList<>(partitionSize);
for (int i = 0; i < partitionSize; i++) {
List<Object[]> objects = temporaryRows.get(i);
- dataTableList.add(SelectionOperatorUtils.getDataTableFromRows(objects, dataTable.getDataSchema()));
+ dataTableList.add(DataBlockBuilder.buildFromRows(objects, dataTable.getDataSchema()));
}
return dataTableList;
}
@@ -190,7 +192,7 @@ public class MailboxSendOperator extends BaseOperator<DataTableBlock> {
return (key.hashCode()) % partitionSize;
}
- private void sendDataTableBlock(ServerInstance serverInstance, DataTable dataTable, boolean isEndOfStream)
+ private void sendDataTableBlock(ServerInstance serverInstance, BaseDataBlock dataTable, boolean isEndOfStream)
throws IOException {
String mailboxId = toMailboxId(serverInstance);
SendingMailbox<Mailbox.MailboxContent> sendingMailbox = _mailboxService.getSendingMailbox(mailboxId);
@@ -201,10 +203,10 @@ public class MailboxSendOperator extends BaseOperator<DataTableBlock> {
}
}
- private Mailbox.MailboxContent toMailboxContent(String mailboxId, DataTable dataTable, boolean isEndOfStream)
+ private Mailbox.MailboxContent toMailboxContent(String mailboxId, BaseDataBlock dataTable, boolean isEndOfStream)
throws IOException {
Mailbox.MailboxContent.Builder builder = Mailbox.MailboxContent.newBuilder().setMailboxId(mailboxId)
- .setPayload(ByteString.copyFrom(new DataTableBlock(dataTable).toBytes()));
+ .setPayload(ByteString.copyFrom(new TransferableBlock(dataTable).toBytes()));
if (isEndOfStream) {
builder.putMetadata("finished", "true");
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
index 7ea69b8d21..fa9f04808c 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
@@ -33,8 +33,9 @@ import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.planner.QueryPlan;
import org.apache.pinot.query.planner.StageMetadata;
import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
-import org.apache.pinot.query.runtime.blocks.DataTableBlock;
-import org.apache.pinot.query.runtime.blocks.DataTableBlockUtils;
+import org.apache.pinot.query.runtime.blocks.BaseDataBlock;
+import org.apache.pinot.query.runtime.blocks.DataBlockUtils;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
import org.apache.pinot.query.runtime.plan.serde.QueryPlanSerDeUtils;
@@ -58,24 +59,13 @@ public class QueryDispatcher {
throws Exception {
// submit all the distributed stages.
int reduceStageId = submit(requestId, queryPlan);
-
- // run reduce stage.
+ // run reduce stage and return result.
MailboxReceiveNode reduceNode = (MailboxReceiveNode) queryPlan.getQueryStageMap().get(reduceStageId);
MailboxReceiveOperator mailboxReceiveOperator = createReduceStageOperator(mailboxService,
queryPlan.getStageMetadataMap().get(reduceNode.getSenderStageId()).getServerInstances(),
requestId, reduceNode.getSenderStageId(), mailboxService.getHostname(),
mailboxService.getMailboxPort());
-
- List<DataTable> queryResults = new ArrayList<>();
- long timeoutWatermark = System.nanoTime() + timeoutNano;
- while (System.nanoTime() < timeoutWatermark) {
- DataTableBlock dataTableBlock = mailboxReceiveOperator.nextBlock();
- queryResults.add(dataTableBlock.getDataTable());
- if (DataTableBlockUtils.isEndOfStream(dataTableBlock)) {
- break;
- }
- }
- return queryResults;
+ return reduceMailboxReceive(mailboxReceiveOperator);
}
public int submit(long requestId, QueryPlan queryPlan)
@@ -109,12 +99,9 @@ public class QueryDispatcher {
return reduceStageId;
}
- protected MailboxReceiveOperator createReduceStageOperator(MailboxService<Mailbox.MailboxContent> mailboxService,
- List<ServerInstance> sendingInstances, long jobId, int stageId, String hostname, int port) {
- MailboxReceiveOperator mailboxReceiveOperator =
- new MailboxReceiveOperator(mailboxService, RelDistribution.Type.ANY, sendingInstances, hostname, port, jobId,
- stageId);
- return mailboxReceiveOperator;
+ private DispatchClient getOrCreateDispatchClient(String host, int port) {
+ String key = String.format("%s_%d", host, port);
+ return _dispatchClientMap.computeIfAbsent(key, k -> new DispatchClient(host, port));
}
public static DistributedStagePlan constructDistributedStagePlan(QueryPlan queryPlan, int stageId,
@@ -123,9 +110,28 @@ public class QueryDispatcher {
queryPlan.getStageMetadataMap());
}
- private DispatchClient getOrCreateDispatchClient(String host, int port) {
- String key = String.format("%s_%d", host, port);
- return _dispatchClientMap.computeIfAbsent(key, k -> new DispatchClient(host, port));
+ public static List<DataTable> reduceMailboxReceive(MailboxReceiveOperator mailboxReceiveOperator) {
+ List<DataTable> resultDataBlocks = new ArrayList<>();
+ TransferableBlock transferableBlock;
+ while (true) {
+ transferableBlock = mailboxReceiveOperator.nextBlock();
+ if (DataBlockUtils.isEndOfStream(transferableBlock)) {
+ break;
+ }
+ if (transferableBlock.getDataBlock() != null) {
+ BaseDataBlock dataTable = transferableBlock.getDataBlock();
+ resultDataBlocks.add(dataTable);
+ }
+ }
+ return resultDataBlocks;
+ }
+
+ public static MailboxReceiveOperator createReduceStageOperator(MailboxService<Mailbox.MailboxContent> mailboxService,
+ List<ServerInstance> sendingInstances, long jobId, int stageId, String hostname, int port) {
+ MailboxReceiveOperator mailboxReceiveOperator =
+ new MailboxReceiveOperator(mailboxService, RelDistribution.Type.ANY, sendingInstances, hostname, port, jobId,
+ stageId);
+ return mailboxReceiveOperator;
}
public static class DispatchClient {
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java
index f1863bc339..e254778db1 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java
@@ -24,8 +24,8 @@ import com.google.protobuf.ByteString;
import java.io.IOException;
import java.util.Map;
import org.apache.pinot.common.proto.Mailbox;
-import org.apache.pinot.core.common.datatable.DataTableBuilder;
-import org.apache.pinot.query.runtime.blocks.DataTableBlock;
+import org.apache.pinot.query.runtime.blocks.DataBlockUtils;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -67,6 +67,7 @@ public class GrpcMailboxServiceTest extends GrpcMailboxServiceTestBase {
throws IOException {
return Mailbox.MailboxContent.newBuilder().setMailboxId(mailboxId)
.putAllMetadata(ImmutableMap.of("key", "value", "finished", "true"))
- .setPayload(ByteString.copyFrom(new DataTableBlock(DataTableBuilder.getEmptyDataTable()).toBytes())).build();
+ .setPayload(ByteString.copyFrom(new TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock()).toBytes()))
+ .build();
}
}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
index 2316195bce..d133584797 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
@@ -27,7 +27,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
-import org.apache.calcite.rel.RelDistribution;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.core.transport.ServerInstance;
@@ -38,8 +37,6 @@ import org.apache.pinot.query.mailbox.GrpcMailboxService;
import org.apache.pinot.query.planner.QueryPlan;
import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
import org.apache.pinot.query.routing.WorkerInstance;
-import org.apache.pinot.query.runtime.blocks.DataTableBlock;
-import org.apache.pinot.query.runtime.blocks.DataTableBlockUtils;
import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
import org.apache.pinot.query.service.QueryConfig;
@@ -84,8 +81,8 @@ public class QueryRunnerTest {
_mailboxService = new GrpcMailboxService(_reducerHostname, _reducerGrpcPort);
_mailboxService.start();
- _queryEnvironment =
- QueryEnvironmentTestUtils.getQueryEnvironment(_reducerGrpcPort, server1.getPort(), server2.getPort());
+ _queryEnvironment = QueryEnvironmentTestUtils.getQueryEnvironment(_reducerGrpcPort, server1.getPort(),
+ server2.getPort());
server1.start();
server2.start();
// this doesn't test the QueryServer functionality so the server port can be the same as the mailbox port.
@@ -111,9 +108,10 @@ public class QueryRunnerTest {
for (int stageId : queryPlan.getStageMetadataMap().keySet()) {
if (queryPlan.getQueryStageMap().get(stageId) instanceof MailboxReceiveNode) {
MailboxReceiveNode reduceNode = (MailboxReceiveNode) queryPlan.getQueryStageMap().get(stageId);
- mailboxReceiveOperator = createReduceStageOperator(
+ mailboxReceiveOperator = QueryDispatcher.createReduceStageOperator(_mailboxService,
queryPlan.getStageMetadataMap().get(reduceNode.getSenderStageId()).getServerInstances(),
- Long.parseLong(requestMetadataMap.get("REQUEST_ID")), reduceNode.getSenderStageId(), _reducerGrpcPort);
+ Long.parseLong(requestMetadataMap.get("REQUEST_ID")), reduceNode.getSenderStageId(), "localhost",
+ _reducerGrpcPort);
} else {
for (ServerInstance serverInstance : queryPlan.getStageMetadataMap().get(stageId).getServerInstances()) {
DistributedStagePlan distributedStagePlan =
@@ -124,10 +122,21 @@ public class QueryRunnerTest {
}
Preconditions.checkNotNull(mailboxReceiveOperator);
- List<Object[]> resultRows = reduceMailboxReceive(mailboxReceiveOperator);
+ List<Object[]> resultRows = toRows(QueryDispatcher.reduceMailboxReceive(mailboxReceiveOperator));
Assert.assertEquals(resultRows.size(), expectedRowCount);
}
+ private static List<Object[]> toRows(List<DataTable> dataTables) {
+ List<Object[]> resultRows = new ArrayList<>();
+ for (DataTable dataTable : dataTables) {
+ int numRows = dataTable.getNumberOfRows();
+ for (int rowId = 0; rowId < numRows; rowId++) {
+ resultRows.add(extractRowFromDataTable(dataTable, rowId));
+ }
+ }
+ return resultRows;
+ }
+
@DataProvider(name = "testDataWithSqlToFinalRowCount")
private Object[][] provideTestSqlAndRowCount() {
return new Object[][] {
@@ -152,31 +161,4 @@ public class QueryRunnerTest {
+ " WHERE a.col3 >= 0 AND a.col2 = 'foo' AND b.col3 >= 0", 3},
};
}
-
- protected static List<Object[]> reduceMailboxReceive(MailboxReceiveOperator mailboxReceiveOperator) {
- List<Object[]> resultRows = new ArrayList<>();
- DataTableBlock dataTableBlock;
- while (true) {
- dataTableBlock = mailboxReceiveOperator.nextBlock();
- if (DataTableBlockUtils.isEndOfStream(dataTableBlock)) {
- break;
- }
- if (dataTableBlock.getDataTable() != null) {
- DataTable dataTable = dataTableBlock.getDataTable();
- int numRows = dataTable.getNumberOfRows();
- for (int rowId = 0; rowId < numRows; rowId++) {
- resultRows.add(extractRowFromDataTable(dataTable, rowId));
- }
- }
- }
- return resultRows;
- }
-
- protected MailboxReceiveOperator createReduceStageOperator(List<ServerInstance> sendingInstances, long jobId,
- int stageId, int port) {
- MailboxReceiveOperator mailboxReceiveOperator =
- new MailboxReceiveOperator(_mailboxService, RelDistribution.Type.ANY, sendingInstances, "localhost", port,
- jobId, stageId);
- return mailboxReceiveOperator;
- }
}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/blocks/DataBlockTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/blocks/DataBlockTest.java
new file mode 100644
index 0000000000..bf33160b92
--- /dev/null
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/blocks/DataBlockTest.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.blocks;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.response.ProcessingException;
+import org.apache.pinot.common.utils.DataSchema;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class DataBlockTest {
+ private static final int TEST_ROW_COUNT = 2;
+
+ @Test
+ public void testException()
+ throws IOException {
+ Exception originalException = new UnsupportedOperationException("Expected test exception.");
+ ProcessingException processingException =
+ QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, originalException);
+ String expected = processingException.getMessage();
+
+ BaseDataBlock dataBlock = DataBlockUtils.getErrorDataBlock(originalException);
+ dataBlock.addException(processingException);
+ Assert.assertEquals(dataBlock.getDataSchema().getColumnNames().length, 0);
+ Assert.assertEquals(dataBlock.getDataSchema().getColumnDataTypes().length, 0);
+ Assert.assertEquals(dataBlock.getNumberOfRows(), 0);
+
+ // Assert processing exception and original exception both matches.
+ String actual = dataBlock.getExceptions().get(QueryException.QUERY_EXECUTION_ERROR.getErrorCode());
+ Assert.assertEquals(actual, expected);
+ Assert.assertEquals(dataBlock.getExceptions().get(QueryException.UNKNOWN_ERROR_CODE),
+ originalException.getMessage());
+ }
+
+ @Test
+ public void testAllDataTypes()
+ throws IOException {
+ DataSchema.ColumnDataType[] columnDataTypes = DataSchema.ColumnDataType.values();
+ int numColumns = columnDataTypes.length;
+ String[] columnNames = new String[numColumns];
+ for (int i = 0; i < numColumns; i++) {
+ columnNames[i] = columnDataTypes[i].name();
+ }
+
+ DataSchema dataSchema = new DataSchema(columnNames, columnDataTypes);
+ List<Object[]> rows = DataBlockTestUtils.getRandomRows(dataSchema, TEST_ROW_COUNT);
+ List<Object[]> columnars = DataBlockTestUtils.convertColumnar(dataSchema, rows);
+ RowDataBlock rowBlock = DataBlockBuilder.buildFromRows(rows, dataSchema);
+ ColumnarDataBlock columnarBlock = DataBlockBuilder.buildFromColumns(columnars, dataSchema);
+
+ for (int colId = 0; colId < dataSchema.getColumnNames().length; colId++) {
+ DataSchema.ColumnDataType columnDataType = dataSchema.getColumnDataType(colId);
+ for (int rowId = 0; rowId < TEST_ROW_COUNT; rowId++) {
+ Object rowVal = DataBlockTestUtils.getElement(rowBlock, rowId, colId, columnDataType);
+ Object colVal = DataBlockTestUtils.getElement(columnarBlock, rowId, colId, columnDataType);
+ Assert.assertEquals(rowVal, colVal, "Error comparing Row/Column Block at (" + rowId + "," + colId + ")"
+ + " of Type: " + columnDataType + "! rowValue: [" + rowVal + "], columnarValue: [" + colVal + "]");
+ }
+ }
+ }
+}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/blocks/DataBlockTestUtils.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/blocks/DataBlockTestUtils.java
new file mode 100644
index 0000000000..caa9b55ccf
--- /dev/null
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/blocks/DataBlockTestUtils.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.blocks;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.spi.utils.ByteArray;
+
+
+public class DataBlockTestUtils {
+ private static final Random RANDOM = new Random();
+ private static final int ARRAY_SIZE = 5;
+
+ private DataBlockTestUtils() {
+ // do not instantiate.
+ }
+
+ public static Object[] getRandomRow(DataSchema dataSchema) {
+ final int numColumns = dataSchema.getColumnNames().length;
+ DataSchema.ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
+ Object[] row = new Object[numColumns];
+ for (int colId = 0; colId < numColumns; colId++) {
+ switch (columnDataTypes[colId].getStoredType()) {
+ case INT:
+ row[colId] = RANDOM.nextInt();
+ break;
+ case LONG:
+ row[colId] = RANDOM.nextLong();
+ break;
+ case FLOAT:
+ row[colId] = RANDOM.nextFloat();
+ break;
+ case DOUBLE:
+ row[colId] = RANDOM.nextDouble();
+ break;
+ case BIG_DECIMAL:
+ row[colId] = BigDecimal.valueOf(RANDOM.nextDouble());
+ break;
+ case STRING:
+ row[colId] = RandomStringUtils.random(RANDOM.nextInt(20));
+ break;
+ case BYTES:
+ row[colId] = new ByteArray(RandomStringUtils.random(RANDOM.nextInt(20)).getBytes());
+ break;
+ // Just test Double here, all object types will be covered in ObjectCustomSerDeTest.
+ case OBJECT:
+ row[colId] = RANDOM.nextDouble();
+ break;
+ case BOOLEAN_ARRAY:
+ case INT_ARRAY:
+ int length = RANDOM.nextInt(ARRAY_SIZE);
+ int[] intArray = new int[length];
+ for (int i = 0; i < length; i++) {
+ intArray[i] = RANDOM.nextInt();
+ }
+ row[colId] = intArray;
+ break;
+ case TIMESTAMP_ARRAY:
+ case LONG_ARRAY:
+ length = RANDOM.nextInt(ARRAY_SIZE);
+ long[] longArray = new long[length];
+ for (int i = 0; i < length; i++) {
+ longArray[i] = RANDOM.nextLong();
+ }
+ row[colId] = longArray;
+ break;
+ case FLOAT_ARRAY:
+ length = RANDOM.nextInt(ARRAY_SIZE);
+ float[] floatArray = new float[length];
+ for (int i = 0; i < length; i++) {
+ floatArray[i] = RANDOM.nextFloat();
+ }
+ row[colId] = floatArray;
+ break;
+ case DOUBLE_ARRAY:
+ length = RANDOM.nextInt(ARRAY_SIZE);
+ double[] doubleArray = new double[length];
+ for (int i = 0; i < length; i++) {
+ doubleArray[i] = RANDOM.nextDouble();
+ }
+ row[colId] = doubleArray;
+ break;
+ case BYTES_ARRAY:
+ case STRING_ARRAY:
+ length = RANDOM.nextInt(ARRAY_SIZE);
+ String[] stringArray = new String[length];
+ for (int i = 0; i < length; i++) {
+ stringArray[i] = RandomStringUtils.random(RANDOM.nextInt(20));
+ }
+ row[colId] = stringArray;
+ break;
+ default:
+ throw new UnsupportedOperationException("Can't fill random data for column type: " + columnDataTypes[colId]);
+ }
+ }
+ return row;
+ }
+
+ public static Object getElement(BaseDataBlock dataBlock, int rowId, int colId,
+ DataSchema.ColumnDataType columnDataType) {
+ switch (columnDataType.getStoredType()) {
+ case INT:
+ return dataBlock.getInt(rowId, colId);
+ case LONG:
+ return dataBlock.getLong(rowId, colId);
+ case FLOAT:
+ return dataBlock.getFloat(rowId, colId);
+ case DOUBLE:
+ return dataBlock.getDouble(rowId, colId);
+ case BIG_DECIMAL:
+ return dataBlock.getBigDecimal(rowId, colId);
+ case STRING:
+ return dataBlock.getString(rowId, colId);
+ case BYTES:
+ return dataBlock.getBytes(rowId, colId);
+ case OBJECT:
+ return dataBlock.getObject(rowId, colId);
+ case BOOLEAN_ARRAY:
+ case INT_ARRAY:
+ return dataBlock.getIntArray(rowId, colId);
+ case TIMESTAMP_ARRAY:
+ case LONG_ARRAY:
+ return dataBlock.getLongArray(rowId, colId);
+ case FLOAT_ARRAY:
+ return dataBlock.getFloatArray(rowId, colId);
+ case DOUBLE_ARRAY:
+ return dataBlock.getDoubleArray(rowId, colId);
+ case BYTES_ARRAY:
+ case STRING_ARRAY:
+ return dataBlock.getStringArray(rowId, colId);
+ default:
+ throw new UnsupportedOperationException("Can't retrieve data for column type: " + columnDataType);
+ }
+ }
+
+ public static List<Object[]> getRandomRows(DataSchema dataSchema, int numRows) {
+ List<Object[]> rows = new ArrayList<>(numRows);
+ for (int i = 0; i < numRows; i++) {
+ rows.add(getRandomRow(dataSchema));
+ }
+ return rows;
+ }
+
+ public static List<Object[]> getRandomColumnar(DataSchema dataSchema, int numRows) {
+ List<Object[]> rows = getRandomRows(dataSchema, numRows);
+ return convertColumnar(dataSchema, rows);
+ }
+
+ public static List<Object[]> convertColumnar(DataSchema dataSchema, List<Object[]> rows) {
+ final int numRows = rows.size();
+ final int numColumns = dataSchema.getColumnNames().length;
+ List<Object[]> columnars = new ArrayList<>(numColumns);
+ for (int colId = 0; colId < numColumns; colId++) {
+ columnars.add(new Object[numRows]);
+ for (int rowId = 0; rowId < numRows; rowId++) {
+ columnars.get(colId)[rowId] = rows.get(rowId)[colId];
+ }
+ }
+ return columnars;
+ }
+}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryDispatcherTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryDispatcherTest.java
new file mode 100644
index 0000000000..9057d1a7ec
--- /dev/null
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryDispatcherTest.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.service;
+
+import com.google.common.collect.Lists;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import org.apache.pinot.query.QueryEnvironment;
+import org.apache.pinot.query.QueryEnvironmentTestUtils;
+import org.apache.pinot.query.planner.PlannerUtils;
+import org.apache.pinot.query.planner.QueryPlan;
+import org.apache.pinot.query.runtime.QueryRunner;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+
+public class QueryDispatcherTest {
+ private static final Random RANDOM_REQUEST_ID_GEN = new Random();
+ private static final int QUERY_SERVER_COUNT = 2;
+ private final Map<Integer, QueryServer> _queryServerMap = new HashMap<>();
+ private final Map<Integer, QueryRunner> _queryRunnerMap = new HashMap<>();
+
+ private QueryEnvironment _queryEnvironment;
+
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+
+ for (int i = 0; i < QUERY_SERVER_COUNT; i++) {
+ int availablePort = QueryEnvironmentTestUtils.getAvailablePort();
+ QueryRunner queryRunner = Mockito.mock(QueryRunner.class);
+ QueryServer queryServer = new QueryServer(availablePort, queryRunner);
+ queryServer.start();
+ _queryServerMap.put(availablePort, queryServer);
+ _queryRunnerMap.put(availablePort, queryRunner);
+ }
+
+ List<Integer> portList = Lists.newArrayList(_queryServerMap.keySet());
+
+ // reducer port doesn't matter, we are testing the worker instance not GRPC.
+ _queryEnvironment = QueryEnvironmentTestUtils.getQueryEnvironment(1, portList.get(0), portList.get(1));
+ }
+
+ @AfterClass
+ public void tearDown() {
+ for (QueryServer worker : _queryServerMap.values()) {
+ worker.shutdown();
+ }
+ }
+
+ @Test(dataProvider = "testDataWithSqlToCompiledAsWorkerRequest")
+ public void testQueryDispatcherCanSendCorrectPayload(String sql)
+ throws Exception {
+ QueryPlan queryPlan = _queryEnvironment.planQuery(sql);
+ QueryDispatcher dispatcher = new QueryDispatcher();
+ int reducerStageId = dispatcher.submit(RANDOM_REQUEST_ID_GEN.nextLong(), queryPlan);
+ Assert.assertTrue(PlannerUtils.isRootStage(reducerStageId));
+ }
+
+ @DataProvider(name = "testDataWithSqlToCompiledAsWorkerRequest")
+ private Object[][] provideTestSqlToCompiledToWorkerRequest() {
+ return new Object[][] {
+ new Object[]{"SELECT * FROM b"},
+ new Object[]{"SELECT * FROM a"},
+ new Object[]{"SELECT * FROM a JOIN b ON a.col3 = b.col3"},
+ new Object[]{"SELECT a.col1, a.ts, c.col2, c.col3 FROM a JOIN c ON a.col1 = c.col2 "
+ + " WHERE (a.col3 >= 0 OR a.col2 = 'foo') AND c.col3 >= 0"},
+ };
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org