You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2022/06/08 06:03:07 UTC

[pinot] 08/11: row/columnar compatible block (#8583)

This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit 30e21eb394a9d022a40bca26b7dcf5e4641f688f
Author: Rong Rong <wa...@gmail.com>
AuthorDate: Tue May 10 14:30:46 2022 -0700

    row/columnar compatible block (#8583)
    
    adding tests and verify both row/columnar format
    
    adding test and validating all types
    
    fix and add query dispatcher test
    
    Co-authored-by: Rong Rong <ro...@startree.ai>
---
 .../apache/pinot/query/runtime/QueryRunner.java    |  15 +-
 .../pinot/query/runtime/blocks/BaseDataBlock.java  | 697 +++++++++++++++++++++
 .../query/runtime/blocks/ColumnarDataBlock.java    |  96 +++
 .../query/runtime/blocks/DataBlockBuilder.java     | 406 ++++++++++++
 .../pinot/query/runtime/blocks/DataBlockUtils.java | 174 +++++
 .../query/runtime/blocks/DataTableBlockUtils.java  |  71 ---
 .../pinot/query/runtime/blocks/MetadataBlock.java  |  66 ++
 .../pinot/query/runtime/blocks/RowDataBlock.java   |  90 +++
 ...{DataTableBlock.java => TransferableBlock.java} |  37 +-
 .../runtime/executor/WorkerQueryExecutor.java      |  16 +-
 .../query/runtime/operator/HashJoinOperator.java   |  51 +-
 .../runtime/operator/MailboxReceiveOperator.java   |  19 +-
 .../runtime/operator/MailboxSendOperator.java      |  48 +-
 .../pinot/query/service/QueryDispatcher.java       |  54 +-
 .../query/mailbox/GrpcMailboxServiceTest.java      |   7 +-
 .../pinot/query/runtime/QueryRunnerTest.java       |  52 +-
 .../pinot/query/runtime/blocks/DataBlockTest.java  |  80 +++
 .../query/runtime/blocks/DataBlockTestUtils.java   | 181 ++++++
 .../pinot/query/service/QueryDispatcherTest.java   |  92 +++
 19 files changed, 2034 insertions(+), 218 deletions(-)

diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index f33cde43c2..e25c6e03b8 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pinot.query.runtime;
 
+import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
@@ -32,6 +34,8 @@ import org.apache.pinot.query.mailbox.GrpcMailboxService;
 import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.planner.StageMetadata;
 import org.apache.pinot.query.planner.stage.MailboxSendNode;
+import org.apache.pinot.query.runtime.blocks.BaseDataBlock;
+import org.apache.pinot.query.runtime.blocks.DataBlockUtils;
 import org.apache.pinot.query.runtime.executor.WorkerQueryExecutor;
 import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
 import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
@@ -94,12 +98,19 @@ public class QueryRunner {
           ServerRequestUtils.constructServerQueryRequest(distributedStagePlan, requestMetadataMap);
 
       // send the data table via mailbox in one-off fashion (e.g. no block-level split, one data table/partition key)
-      DataTable dataTable = _serverExecutor.processQuery(serverQueryRequest, executorService, null);
+      BaseDataBlock dataBlock;
+      try {
+        DataTable dataTable = _serverExecutor.processQuery(serverQueryRequest, executorService, null);
+        // this works because default DataTableImplV3 will have ordinal 0, which maps to ROW(0)
+        dataBlock = DataBlockUtils.getDataBlock(ByteBuffer.wrap(dataTable.toBytes()));
+      } catch (IOException e) {
+        throw new RuntimeException("Unable to convert byte buffer", e);
+      }
 
       MailboxSendNode sendNode = (MailboxSendNode) distributedStagePlan.getStageRoot();
       StageMetadata receivingStageMetadata = distributedStagePlan.getMetadataMap().get(sendNode.getReceiverStageId());
       MailboxSendOperator mailboxSendOperator =
-          new MailboxSendOperator(_mailboxService, dataTable, receivingStageMetadata.getServerInstances(),
+          new MailboxSendOperator(_mailboxService, dataBlock, receivingStageMetadata.getServerInstances(),
               sendNode.getExchangeType(), sendNode.getPartitionKeySelector(), _hostname, _port,
               serverQueryRequest.getRequestId(), sendNode.getStageId());
       mailboxSendOperator.nextBlock();
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/BaseDataBlock.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/BaseDataBlock.java
new file mode 100644
index 0000000000..97d007b58b
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/BaseDataBlock.java
@@ -0,0 +1,697 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.blocks;
+
+import com.google.common.primitives.Ints;
+import com.google.common.primitives.Longs;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.common.response.ProcessingException;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.common.datatable.DataTableUtils;
+import org.apache.pinot.core.query.request.context.ThreadTimer;
+import org.apache.pinot.spi.utils.BigDecimalUtils;
+import org.apache.pinot.spi.utils.ByteArray;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+
+/**
+ * Base data block mostly replicating implementation of {@link org.apache.pinot.core.common.datatable.DataTableImplV3}.
+ *
+ * +-----------------------------------------------+
+ * | 13 integers of header:                        |
+ * | VERSION                                       |
+ * | NUM_ROWS                                      |
+ * | NUM_COLUMNS                                   |
+ * | EXCEPTIONS SECTION START OFFSET               |
+ * | EXCEPTIONS SECTION LENGTH                     |
+ * | DICTIONARY_MAP SECTION START OFFSET           |
+ * | DICTIONARY_MAP SECTION LENGTH                 |
+ * | DATA_SCHEMA SECTION START OFFSET              |
+ * | DATA_SCHEMA SECTION LENGTH                    |
+ * | FIXED_SIZE_DATA SECTION START OFFSET          |
+ * | FIXED_SIZE_DATA SECTION LENGTH                |
+ * | VARIABLE_SIZE_DATA SECTION START OFFSET       |
+ * | VARIABLE_SIZE_DATA SECTION LENGTH             |
+ * +-----------------------------------------------+
+ * | EXCEPTIONS SECTION                            |
+ * +-----------------------------------------------+
+ * | DICTIONARY_MAP SECTION                        |
+ * +-----------------------------------------------+
+ * | DATA_SCHEMA SECTION                           |
+ * +-----------------------------------------------+
+ * | FIXED_SIZE_DATA SECTION                       |
+ * +-----------------------------------------------+
+ * | VARIABLE_SIZE_DATA SECTION                    |
+ * +-----------------------------------------------+
+ * | METADATA LENGTH                               |
+ * | METADATA SECTION                              |
+ * +-----------------------------------------------+
+ *
+ * To support both row and columnar data format. the size of the data payload will be exactly the same. the only
+ * difference is the data layout in FIXED_SIZE_DATA and VARIABLE_SIZE_DATA section, see each impl for details.
+ */
+@SuppressWarnings("DuplicatedCode")
+public abstract class BaseDataBlock implements DataTable {
+  protected static final int HEADER_SIZE = Integer.BYTES * 13;
+  // _errCodeToExceptionMap stores exceptions as a map of errorCode->errorMessage
+  protected Map<Integer, String> _errCodeToExceptionMap;
+
+  protected int _numRows;
+  protected int _numColumns;
+  protected DataSchema _dataSchema;
+  protected Map<String, Map<Integer, String>> _dictionaryMap;
+  protected byte[] _fixedSizeDataBytes;
+  protected ByteBuffer _fixedSizeData;
+  protected byte[] _variableSizeDataBytes;
+  protected ByteBuffer _variableSizeData;
+  protected Map<String, String> _metadata;
+
+  /**
+   * construct a base data block.
+   * @param numRows num of rows in the block
+   * @param dataSchema schema of the data in the block
+   * @param dictionaryMap dictionary encoding map
+   * @param fixedSizeDataBytes byte[] for fix-sized columns.
+   * @param variableSizeDataBytes byte[] for variable length columns (arrays).
+   */
+  public BaseDataBlock(int numRows, DataSchema dataSchema, Map<String, Map<Integer, String>> dictionaryMap,
+      byte[] fixedSizeDataBytes, byte[] variableSizeDataBytes) {
+    _numRows = numRows;
+    _numColumns = dataSchema.size();
+    _dataSchema = dataSchema;
+    _dictionaryMap = dictionaryMap;
+    _fixedSizeDataBytes = fixedSizeDataBytes;
+    _fixedSizeData = ByteBuffer.wrap(fixedSizeDataBytes);
+    _variableSizeDataBytes = variableSizeDataBytes;
+    _variableSizeData = ByteBuffer.wrap(variableSizeDataBytes);
+    _metadata = new HashMap<>();
+    _errCodeToExceptionMap = new HashMap<>();
+  }
+
+  /**
+   * Construct empty data table.
+   */
+  public BaseDataBlock() {
+    _numRows = 0;
+    _numColumns = 0;
+    _dataSchema = null;
+    _dictionaryMap = null;
+    _fixedSizeDataBytes = null;
+    _fixedSizeData = null;
+    _variableSizeDataBytes = null;
+    _variableSizeData = null;
+    _metadata = new HashMap<>();
+    _errCodeToExceptionMap = new HashMap<>();
+  }
+
+  public BaseDataBlock(ByteBuffer byteBuffer)
+      throws IOException {
+    // Read header.
+    _numRows = byteBuffer.getInt();
+    _numColumns = byteBuffer.getInt();
+    int exceptionsStart = byteBuffer.getInt();
+    int exceptionsLength = byteBuffer.getInt();
+    int dictionaryMapStart = byteBuffer.getInt();
+    int dictionaryMapLength = byteBuffer.getInt();
+    int dataSchemaStart = byteBuffer.getInt();
+    int dataSchemaLength = byteBuffer.getInt();
+    int fixedSizeDataStart = byteBuffer.getInt();
+    int fixedSizeDataLength = byteBuffer.getInt();
+    int variableSizeDataStart = byteBuffer.getInt();
+    int variableSizeDataLength = byteBuffer.getInt();
+
+
+    // Read exceptions.
+    if (exceptionsLength != 0) {
+      byteBuffer.position(exceptionsStart);
+      _errCodeToExceptionMap = deserializeExceptions(byteBuffer);
+    } else {
+      _errCodeToExceptionMap = new HashMap<>();
+    }
+
+    // Read dictionary.
+    if (dictionaryMapLength != 0) {
+      byteBuffer.position(dictionaryMapStart);
+      _dictionaryMap = deserializeDictionaryMap(byteBuffer);
+    } else {
+      _dictionaryMap = null;
+    }
+
+    // Read data schema.
+    if (dataSchemaLength != 0) {
+      byteBuffer.position(dataSchemaStart);
+      _dataSchema = DataSchema.fromBytes(byteBuffer);
+    } else {
+      _dataSchema = null;
+    }
+
+    // Read fixed size data.
+    if (fixedSizeDataLength != 0) {
+      _fixedSizeDataBytes = new byte[fixedSizeDataLength];
+      byteBuffer.position(fixedSizeDataStart);
+      byteBuffer.get(_fixedSizeDataBytes);
+      _fixedSizeData = ByteBuffer.wrap(_fixedSizeDataBytes);
+    } else {
+      _fixedSizeDataBytes = null;
+      _fixedSizeData = null;
+    }
+
+    // Read variable size data.
+    if (variableSizeDataLength != 0) {
+      _variableSizeDataBytes = new byte[variableSizeDataLength];
+      byteBuffer.position(variableSizeDataStart);
+      byteBuffer.get(_variableSizeDataBytes);
+      _variableSizeData = ByteBuffer.wrap(_variableSizeDataBytes);
+    } else {
+      _variableSizeDataBytes = null;
+      _variableSizeData = null;
+    }
+
+    // Read metadata.
+    int metadataLength = byteBuffer.getInt();
+    if (metadataLength != 0) {
+      _metadata = deserializeMetadata(byteBuffer);
+    }
+  }
+
+  /**
+   * Return the int serialized form of the data block version and type.
+   * @return
+   */
+  protected abstract int getDataBlockVersionType();
+
+  /**
+   * position the {@code _fixedSizeDataBytes} member variable to the corresponding row/column ID.
+   * @param rowId row ID
+   * @param colId column ID
+   */
+  protected abstract void positionCursorInFixSizedBuffer(int rowId, int colId);
+
+  /**
+   * position the {@code _variableSizeDataBytes} member variable to the corresponding row/column ID. and return the
+   * length of bytes to extract from the variable size buffer.
+   *
+   * @param rowId row ID
+   * @param colId column ID
+   * @return the length to extract from variable size buffer.
+   */
+  protected abstract int positionCursorInVariableBuffer(int rowId, int colId);
+
+  @Override
+  public Map<String, String> getMetadata() {
+    return _metadata;
+  }
+
+  @Override
+  public DataSchema getDataSchema() {
+    return _dataSchema;
+  }
+
+  @Override
+  public int getNumberOfRows() {
+    return _numRows;
+  }
+
+  // --------------------------------------------------------------------------
+  // Fixed sized element access.
+  // --------------------------------------------------------------------------
+
+  @Override
+  public int getInt(int rowId, int colId) {
+    positionCursorInFixSizedBuffer(rowId, colId);
+    return _fixedSizeData.getInt();
+  }
+
+  @Override
+  public long getLong(int rowId, int colId) {
+    positionCursorInFixSizedBuffer(rowId, colId);
+    return _fixedSizeData.getLong();
+  }
+
+  @Override
+  public float getFloat(int rowId, int colId) {
+    positionCursorInFixSizedBuffer(rowId, colId);
+    return _fixedSizeData.getFloat();
+  }
+
+  @Override
+  public double getDouble(int rowId, int colId) {
+    positionCursorInFixSizedBuffer(rowId, colId);
+    return _fixedSizeData.getDouble();
+  }
+
+  @Override
+  public BigDecimal getBigDecimal(int rowId, int colId) {
+    int size = positionCursorInVariableBuffer(rowId, colId);
+    ByteBuffer byteBuffer = _variableSizeData.slice();
+    byteBuffer.limit(size);
+    return BigDecimalUtils.deserialize(byteBuffer);
+  }
+
+  @Override
+  public String getString(int rowId, int colId) {
+    positionCursorInFixSizedBuffer(rowId, colId);
+    int dictId = _fixedSizeData.getInt();
+    return _dictionaryMap.get(_dataSchema.getColumnName(colId)).get(dictId);
+  }
+
+  @Override
+  public ByteArray getBytes(int rowId, int colId) {
+    int size = positionCursorInVariableBuffer(rowId, colId);
+    byte[] buffer = new byte[size];
+    _variableSizeData.get(buffer);
+    return new ByteArray(buffer);
+  }
+
+  // --------------------------------------------------------------------------
+  // Variable sized element access.
+  // --------------------------------------------------------------------------
+
+  @Override
+  public <T> T getObject(int rowId, int colId) {
+    int size = positionCursorInVariableBuffer(rowId, colId);
+    int objectTypeValue = _variableSizeData.getInt();
+    ByteBuffer byteBuffer = _variableSizeData.slice();
+    byteBuffer.limit(size);
+    return ObjectSerDeUtils.deserialize(byteBuffer, objectTypeValue);
+  }
+
+  @Override
+  public int[] getIntArray(int rowId, int colId) {
+    int length = positionCursorInVariableBuffer(rowId, colId);
+    int[] ints = new int[length];
+    for (int i = 0; i < length; i++) {
+      ints[i] = _variableSizeData.getInt();
+    }
+    return ints;
+  }
+
+  @Override
+  public long[] getLongArray(int rowId, int colId) {
+    int length = positionCursorInVariableBuffer(rowId, colId);
+    long[] longs = new long[length];
+    for (int i = 0; i < length; i++) {
+      longs[i] = _variableSizeData.getLong();
+    }
+    return longs;
+  }
+
+  @Override
+  public float[] getFloatArray(int rowId, int colId) {
+    int length = positionCursorInVariableBuffer(rowId, colId);
+    float[] floats = new float[length];
+    for (int i = 0; i < length; i++) {
+      floats[i] = _variableSizeData.getFloat();
+    }
+    return floats;
+  }
+
+  @Override
+  public double[] getDoubleArray(int rowId, int colId) {
+    int length = positionCursorInVariableBuffer(rowId, colId);
+    double[] doubles = new double[length];
+    for (int i = 0; i < length; i++) {
+      doubles[i] = _variableSizeData.getDouble();
+    }
+    return doubles;
+  }
+
+  @Override
+  public String[] getStringArray(int rowId, int colId) {
+    int length = positionCursorInVariableBuffer(rowId, colId);
+    String[] strings = new String[length];
+    Map<Integer, String> dictionary = _dictionaryMap.get(_dataSchema.getColumnName(colId));
+    for (int i = 0; i < length; i++) {
+      strings[i] = dictionary.get(_variableSizeData.getInt());
+    }
+    return strings;
+  }
+
+  // --------------------------------------------------------------------------
+  // Ser/De and exception handling
+  // --------------------------------------------------------------------------
+
+  /**
+   * Helper method to serialize dictionary map.
+   */
+  protected byte[] serializeDictionaryMap()
+      throws IOException {
+    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+    DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
+
+    dataOutputStream.writeInt(_dictionaryMap.size());
+    for (Map.Entry<String, Map<Integer, String>> dictionaryMapEntry : _dictionaryMap.entrySet()) {
+      String columnName = dictionaryMapEntry.getKey();
+      Map<Integer, String> dictionary = dictionaryMapEntry.getValue();
+      byte[] bytes = columnName.getBytes(UTF_8);
+      dataOutputStream.writeInt(bytes.length);
+      dataOutputStream.write(bytes);
+      dataOutputStream.writeInt(dictionary.size());
+
+      for (Map.Entry<Integer, String> dictionaryEntry : dictionary.entrySet()) {
+        dataOutputStream.writeInt(dictionaryEntry.getKey());
+        byte[] valueBytes = dictionaryEntry.getValue().getBytes(UTF_8);
+        dataOutputStream.writeInt(valueBytes.length);
+        dataOutputStream.write(valueBytes);
+      }
+    }
+
+    return byteArrayOutputStream.toByteArray();
+  }
+
+  /**
+   * Helper method to deserialize dictionary map.
+   */
+  protected Map<String, Map<Integer, String>> deserializeDictionaryMap(ByteBuffer buffer)
+      throws IOException {
+    int numDictionaries = buffer.getInt();
+    Map<String, Map<Integer, String>> dictionaryMap = new HashMap<>(numDictionaries);
+
+    for (int i = 0; i < numDictionaries; i++) {
+      String column = DataTableUtils.decodeString(buffer);
+      int dictionarySize = buffer.getInt();
+      Map<Integer, String> dictionary = new HashMap<>(dictionarySize);
+      for (int j = 0; j < dictionarySize; j++) {
+        int key = buffer.getInt();
+        String value = DataTableUtils.decodeString(buffer);
+        dictionary.put(key, value);
+      }
+      dictionaryMap.put(column, dictionary);
+    }
+
+    return dictionaryMap;
+  }
+
+  @Override
+  public void addException(ProcessingException processingException) {
+    _errCodeToExceptionMap.put(processingException.getErrorCode(), processingException.getMessage());
+  }
+
+  @Override
+  public void addException(int errCode, String errMsg) {
+    _errCodeToExceptionMap.put(errCode, errMsg);
+  }
+
+  @Override
+  public Map<Integer, String> getExceptions() {
+    return _errCodeToExceptionMap;
+  }
+
+  @Override
+  public byte[] toBytes()
+      throws IOException {
+    ThreadTimer threadTimer = new ThreadTimer();
+
+    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+    DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
+    writeLeadingSections(dataOutputStream);
+
+    // Add table serialization time metadata if thread timer is enabled.
+    if (ThreadTimer.isThreadCpuTimeMeasurementEnabled()) {
+      long responseSerializationCpuTimeNs = threadTimer.getThreadTimeNs();
+      getMetadata().put(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName(), String.valueOf(responseSerializationCpuTimeNs));
+    }
+
+    // Write metadata: length followed by actual metadata bytes.
+    // NOTE: We ignore metadata serialization time in "responseSerializationCpuTimeNs" as it's negligible while
+    // considering it will bring a lot code complexity.
+    byte[] metadataBytes = serializeMetadata();
+    dataOutputStream.writeInt(metadataBytes.length);
+    dataOutputStream.write(metadataBytes);
+
+    return byteArrayOutputStream.toByteArray();
+  }
+
+  private void writeLeadingSections(DataOutputStream dataOutputStream)
+      throws IOException {
+    dataOutputStream.writeInt(getDataBlockVersionType());
+    dataOutputStream.writeInt(_numRows);
+    dataOutputStream.writeInt(_numColumns);
+    int dataOffset = HEADER_SIZE;
+
+    // Write exceptions section offset(START|SIZE).
+    dataOutputStream.writeInt(dataOffset);
+    byte[] exceptionsBytes;
+    exceptionsBytes = serializeExceptions();
+    dataOutputStream.writeInt(exceptionsBytes.length);
+    dataOffset += exceptionsBytes.length;
+
+    // Write dictionary map section offset(START|SIZE).
+    dataOutputStream.writeInt(dataOffset);
+    byte[] dictionaryMapBytes = null;
+    if (_dictionaryMap != null) {
+      dictionaryMapBytes = serializeDictionaryMap();
+      dataOutputStream.writeInt(dictionaryMapBytes.length);
+      dataOffset += dictionaryMapBytes.length;
+    } else {
+      dataOutputStream.writeInt(0);
+    }
+
+    // Write data schema section offset(START|SIZE).
+    dataOutputStream.writeInt(dataOffset);
+    byte[] dataSchemaBytes = null;
+    if (_dataSchema != null) {
+      dataSchemaBytes = _dataSchema.toBytes();
+      dataOutputStream.writeInt(dataSchemaBytes.length);
+      dataOffset += dataSchemaBytes.length;
+    } else {
+      dataOutputStream.writeInt(0);
+    }
+
+    // Write fixed size data section offset(START|SIZE).
+    dataOutputStream.writeInt(dataOffset);
+    if (_fixedSizeDataBytes != null) {
+      dataOutputStream.writeInt(_fixedSizeDataBytes.length);
+      dataOffset += _fixedSizeDataBytes.length;
+    } else {
+      dataOutputStream.writeInt(0);
+    }
+
+    // Write variable size data section offset(START|SIZE).
+    dataOutputStream.writeInt(dataOffset);
+    if (_variableSizeDataBytes != null) {
+      dataOutputStream.writeInt(_variableSizeDataBytes.length);
+    } else {
+      dataOutputStream.writeInt(0);
+    }
+
+    // Write actual data.
+    // Write exceptions bytes.
+    dataOutputStream.write(exceptionsBytes);
+    // Write dictionary map bytes.
+    if (dictionaryMapBytes != null) {
+      dataOutputStream.write(dictionaryMapBytes);
+    }
+    // Write data schema bytes.
+    if (dataSchemaBytes != null) {
+      dataOutputStream.write(dataSchemaBytes);
+    }
+    // Write fixed size data bytes.
+    if (_fixedSizeDataBytes != null) {
+      dataOutputStream.write(_fixedSizeDataBytes);
+    }
+    // Write variable size data bytes.
+    if (_variableSizeDataBytes != null) {
+      dataOutputStream.write(_variableSizeDataBytes);
+    }
+  }
+
+  /**
+   * Serialize metadata section to bytes.
+   * Format of the bytes looks like:
+   * [numEntries, bytesOfKV2, bytesOfKV2, bytesOfKV3]
+   * For each KV pair:
+   * - if the value type is String, encode it as: [enumKeyOrdinal, valueLength, Utf8EncodedValue].
+   * - if the value type is int, encode it as: [enumKeyOrdinal, bigEndianRepresentationOfIntValue]
+   * - if the value type is long, encode it as: [enumKeyOrdinal, bigEndianRepresentationOfLongValue]
+   *
+   * Unlike V2, where numeric metadata values (int and long) in V3 are encoded in UTF-8 in the wire format,
+   * in V3 big endian representation is used.
+   */
+  private byte[] serializeMetadata()
+      throws IOException {
+    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+    DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
+
+    dataOutputStream.writeInt(_metadata.size());
+
+    for (Map.Entry<String, String> entry : _metadata.entrySet()) {
+      MetadataKey key = MetadataKey.getByName(entry.getKey());
+      // Ignore unknown keys.
+      if (key == null) {
+        continue;
+      }
+      String value = entry.getValue();
+      dataOutputStream.writeInt(key.ordinal());
+      if (key.getValueType() == MetadataValueType.INT) {
+        dataOutputStream.write(Ints.toByteArray(Integer.parseInt(value)));
+      } else if (key.getValueType() == MetadataValueType.LONG) {
+        dataOutputStream.write(Longs.toByteArray(Long.parseLong(value)));
+      } else {
+        byte[] valueBytes = value.getBytes(UTF_8);
+        dataOutputStream.writeInt(valueBytes.length);
+        dataOutputStream.write(valueBytes);
+      }
+    }
+
+    return byteArrayOutputStream.toByteArray();
+  }
+
+  /**
+   * Even though the wire format of V3 uses UTF-8 for string/bytes and big-endian for numeric values,
+   * the in-memory representation is STRING based for processing the metadata before serialization
+   * (by the server as it adds the statistics in metadata) and after deserialization (by the broker as it receives
+   * DataTable from each server and aggregates the values).
+   * This is to make V3 implementation keep the consumers of Map<String, String> getMetadata() API in the code happy
+   * by internally converting it.
+   *
+   * This method use relative operations on the ByteBuffer and expects the buffer's position to be set correctly.
+   */
+  private Map<String, String> deserializeMetadata(ByteBuffer buffer)
+      throws IOException {
+    int numEntries = buffer.getInt();
+    Map<String, String> metadata = new HashMap<>();
+    for (int i = 0; i < numEntries; i++) {
+      int keyId = buffer.getInt();
+      MetadataKey key = MetadataKey.getByOrdinal(keyId);
+      // Ignore unknown keys.
+      if (key == null) {
+        continue;
+      }
+      if (key.getValueType() == MetadataValueType.INT) {
+        String value = "" + buffer.getInt();
+        metadata.put(key.getName(), value);
+      } else if (key.getValueType() == MetadataValueType.LONG) {
+        String value = "" + buffer.getLong();
+        metadata.put(key.getName(), value);
+      } else {
+        String value = DataTableUtils.decodeString(buffer);
+        metadata.put(key.getName(), value);
+      }
+    }
+    return metadata;
+  }
+
+  private byte[] serializeExceptions()
+      throws IOException {
+    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+    DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
+
+    dataOutputStream.writeInt(_errCodeToExceptionMap.size());
+
+    for (Map.Entry<Integer, String> entry : _errCodeToExceptionMap.entrySet()) {
+      int key = entry.getKey();
+      String value = entry.getValue();
+      byte[] valueBytes = value.getBytes(UTF_8);
+      dataOutputStream.writeInt(key);
+      dataOutputStream.writeInt(valueBytes.length);
+      dataOutputStream.write(valueBytes);
+    }
+
+    return byteArrayOutputStream.toByteArray();
+  }
+
+  private Map<Integer, String> deserializeExceptions(ByteBuffer buffer)
+      throws IOException {
+    int numExceptions = buffer.getInt();
+    Map<Integer, String> exceptions = new HashMap<>(numExceptions);
+    for (int i = 0; i < numExceptions; i++) {
+      int errCode = buffer.getInt();
+      String errMessage = DataTableUtils.decodeString(buffer);
+      exceptions.put(errCode, errMessage);
+    }
+    return exceptions;
+  }
+
+  @Override
+  public String toString() {
+    if (_dataSchema == null) {
+      return _metadata.toString();
+    }
+
+    StringBuilder stringBuilder = new StringBuilder();
+    stringBuilder.append(_dataSchema.toString()).append('\n');
+    stringBuilder.append("numRows: ").append(_numRows).append('\n');
+
+    DataSchema.ColumnDataType[] storedColumnDataTypes = _dataSchema.getStoredColumnDataTypes();
+    _fixedSizeData.position(0);
+    for (int rowId = 0; rowId < _numRows; rowId++) {
+      for (int colId = 0; colId < _numColumns; colId++) {
+        switch (storedColumnDataTypes[colId]) {
+          case INT:
+            stringBuilder.append(_fixedSizeData.getInt());
+            break;
+          case LONG:
+            stringBuilder.append(_fixedSizeData.getLong());
+            break;
+          case FLOAT:
+            stringBuilder.append(_fixedSizeData.getFloat());
+            break;
+          case DOUBLE:
+            stringBuilder.append(_fixedSizeData.getDouble());
+            break;
+          case STRING:
+            stringBuilder.append(_fixedSizeData.getInt());
+            break;
+          // Object and array.
+          default:
+            stringBuilder.append(String.format("(%s:%s)", _fixedSizeData.getInt(), _fixedSizeData.getInt()));
+            break;
+        }
+        stringBuilder.append("\t");
+      }
+      stringBuilder.append("\n");
+    }
+    return stringBuilder.toString();
+  }
+
+  public enum Type {
+    ROW(0),
+    COLUMNAR(1),
+    METADATA(2);
+
+    private final int _ordinal;
+
+    Type(int ordinal) {
+      _ordinal = ordinal;
+    }
+
+    public static Type fromOrdinal(int ordinal) {
+      switch (ordinal) {
+        case 0:
+          return ROW;
+        case 1:
+          return COLUMNAR;
+        case 2:
+          return METADATA;
+        default:
+          throw new IllegalArgumentException("Invalid ordinal: " + ordinal);
+      }
+    }
+  }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/ColumnarDataBlock.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/ColumnarDataBlock.java
new file mode 100644
index 0000000000..8510043251
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/ColumnarDataBlock.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.blocks;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import org.apache.pinot.common.utils.DataSchema;
+
+
+/**
+ * Column-wise data table. It stores data in columnar-major format.
+ */
+public class ColumnarDataBlock extends BaseDataBlock {
+  private static final int VERSION = 1;
+  protected int[] _cumulativeColumnOffsetSizeInBytes;
+  protected int[] _columnSizeInBytes;
+
+  public ColumnarDataBlock() {
+    super();
+  }
+
+  public ColumnarDataBlock(int numRows, DataSchema dataSchema, Map<String, Map<Integer, String>> dictionaryMap,
+      byte[] fixedSizeDataBytes, byte[] variableSizeDataBytes) {
+    super(numRows, dataSchema, dictionaryMap, fixedSizeDataBytes, variableSizeDataBytes);
+    computeBlockObjectConstants();
+  }
+
+  public ColumnarDataBlock(ByteBuffer byteBuffer)
+      throws IOException {
+    super(byteBuffer);
+    computeBlockObjectConstants();
+  }
+
+  protected void computeBlockObjectConstants() {
+    if (_dataSchema != null) {
+      _cumulativeColumnOffsetSizeInBytes = new int[_numColumns];
+      _columnSizeInBytes = new int[_numColumns];
+      DataBlockUtils.computeColumnSizeInBytes(_dataSchema, _columnSizeInBytes);
+      int cumulativeColumnOffset = 0;
+      for (int i = 0; i < _numColumns; i++) {
+        _cumulativeColumnOffsetSizeInBytes[i] = cumulativeColumnOffset;
+        cumulativeColumnOffset += _columnSizeInBytes[i] * _numRows;
+      }
+    }
+  }
+
+  @Override
+  protected int getDataBlockVersionType() {
+    return VERSION + (Type.COLUMNAR.ordinal() << DataBlockUtils.VERSION_TYPE_SHIFT);
+  }
+
+  @Override
+  protected void positionCursorInFixSizedBuffer(int rowId, int colId) {
+    int position = _cumulativeColumnOffsetSizeInBytes[colId] + _columnSizeInBytes[colId] * rowId;
+    _fixedSizeData.position(position);
+  }
+
+  @Override
+  protected int positionCursorInVariableBuffer(int rowId, int colId) {
+    positionCursorInFixSizedBuffer(rowId, colId);
+    _variableSizeData.position(_fixedSizeData.getInt());
+    return _fixedSizeData.getInt();
+  }
+
+  @Override
+  public ColumnarDataBlock toMetadataOnlyDataTable() {
+    ColumnarDataBlock metadataOnlyDataTable = new ColumnarDataBlock();
+    metadataOnlyDataTable._metadata.putAll(_metadata);
+    metadataOnlyDataTable._errCodeToExceptionMap.putAll(_errCodeToExceptionMap);
+    return metadataOnlyDataTable;
+  }
+
+  @Override
+  public ColumnarDataBlock toDataOnlyDataTable() {
+    return new ColumnarDataBlock(_numRows, _dataSchema, _dictionaryMap, _fixedSizeDataBytes, _variableSizeDataBytes);
+  }
+
+  // TODO: add whole-column access methods.
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/DataBlockBuilder.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/DataBlockBuilder.java
new file mode 100644
index 0000000000..0c456c1d85
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/DataBlockBuilder.java
@@ -0,0 +1,406 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.blocks;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.common.datatable.DataTableUtils;
+import org.apache.pinot.spi.utils.ArrayCopyUtils;
+import org.apache.pinot.spi.utils.BigDecimalUtils;
+import org.apache.pinot.spi.utils.ByteArray;
+
+
+public class DataBlockBuilder {
+  private final DataSchema _dataSchema;
+  private final BaseDataBlock.Type _blockType;
+  private final DataSchema.ColumnDataType[] _columnDataType;
+
+  private int[] _columnOffsets;
+  private int _rowSizeInBytes;
+  private int[] _cumulativeColumnOffsetSizeInBytes;
+  private int[] _columnSizeInBytes;
+
+  private int _numRows;
+  private int _numColumns;
+
+  private final Map<String, Map<String, Integer>> _dictionaryMap = new HashMap<>();
+  private final Map<String, Map<Integer, String>> _reverseDictionaryMap = new HashMap<>();
+  private final ByteArrayOutputStream _fixedSizeDataByteArrayOutputStream = new ByteArrayOutputStream();
+  private final ByteArrayOutputStream _variableSizeDataByteArrayOutputStream = new ByteArrayOutputStream();
+  private final DataOutputStream _variableSizeDataOutputStream =
+      new DataOutputStream(_variableSizeDataByteArrayOutputStream);
+
+
+  private ByteBuffer _currentRowDataByteBuffer;
+
+  private DataBlockBuilder(DataSchema dataSchema, BaseDataBlock.Type blockType) {
+    _dataSchema = dataSchema;
+    _columnDataType = dataSchema.getStoredColumnDataTypes();
+    _blockType = blockType;
+    _numColumns = dataSchema.size();
+    if (_blockType == BaseDataBlock.Type.COLUMNAR) {
+      _cumulativeColumnOffsetSizeInBytes = new int[_numColumns];
+      _columnSizeInBytes = new int[_numColumns];
+      DataBlockUtils.computeColumnSizeInBytes(_dataSchema, _columnSizeInBytes);
+      int cumulativeColumnOffset = 0;
+      for (int i = 0; i < _numColumns; i++) {
+        _cumulativeColumnOffsetSizeInBytes[i] = cumulativeColumnOffset;
+        cumulativeColumnOffset += _columnSizeInBytes[i] * _numRows;
+      }
+    } else if (_blockType == BaseDataBlock.Type.ROW) {
+      _columnOffsets = new int[_numColumns];
+      _rowSizeInBytes = DataTableUtils.computeColumnOffsets(dataSchema, _columnOffsets);
+    }
+  }
+
+  public static RowDataBlock buildFromRows(List<Object[]> rows, DataSchema dataSchema)
+      throws IOException {
+    DataBlockBuilder rowBuilder = new DataBlockBuilder(dataSchema, BaseDataBlock.Type.ROW);
+    rowBuilder._numRows = rows.size();
+    for (Object[] row : rows) {
+      ByteBuffer byteBuffer = ByteBuffer.allocate(rowBuilder._rowSizeInBytes);
+      for (int i = 0; i < rowBuilder._numColumns; i++) {
+        Object value = row[i];
+        switch (rowBuilder._columnDataType[i]) {
+          // Single-value column
+          case INT:
+            byteBuffer.putInt(((Number) value).intValue());
+            break;
+          case LONG:
+            byteBuffer.putLong(((Number) value).longValue());
+            break;
+          case FLOAT:
+            byteBuffer.putFloat(((Number) value).floatValue());
+            break;
+          case DOUBLE:
+            byteBuffer.putDouble(((Number) value).doubleValue());
+            break;
+          case BIG_DECIMAL:
+            setColumn(rowBuilder, byteBuffer, (BigDecimal) value);
+            break;
+          case STRING:
+            setColumn(rowBuilder, byteBuffer, i, (String) value);
+            break;
+          case BYTES:
+            setColumn(rowBuilder, byteBuffer, (ByteArray) value);
+            break;
+          case OBJECT:
+            setColumn(rowBuilder, byteBuffer, value);
+            break;
+          // Multi-value column
+          case BOOLEAN_ARRAY:
+          case INT_ARRAY:
+            setColumn(rowBuilder, byteBuffer, (int[]) value);
+            break;
+          case TIMESTAMP_ARRAY:
+          case LONG_ARRAY:
+            // LONG_ARRAY type covers INT_ARRAY and LONG_ARRAY
+            if (value instanceof int[]) {
+              int[] ints = (int[]) value;
+              int length = ints.length;
+              long[] longs = new long[length];
+              ArrayCopyUtils.copy(ints, longs, length);
+              setColumn(rowBuilder, byteBuffer, longs);
+            } else {
+              setColumn(rowBuilder, byteBuffer, (long[]) value);
+            }
+            break;
+          case FLOAT_ARRAY:
+            setColumn(rowBuilder, byteBuffer, (float[]) value);
+            break;
+          case DOUBLE_ARRAY:
+            // DOUBLE_ARRAY type covers INT_ARRAY, LONG_ARRAY, FLOAT_ARRAY and DOUBLE_ARRAY
+            if (value instanceof int[]) {
+              int[] ints = (int[]) value;
+              int length = ints.length;
+              double[] doubles = new double[length];
+              ArrayCopyUtils.copy(ints, doubles, length);
+              setColumn(rowBuilder, byteBuffer, doubles);
+            } else if (value instanceof long[]) {
+              long[] longs = (long[]) value;
+              int length = longs.length;
+              double[] doubles = new double[length];
+              ArrayCopyUtils.copy(longs, doubles, length);
+              setColumn(rowBuilder, byteBuffer, doubles);
+            } else if (value instanceof float[]) {
+              float[] floats = (float[]) value;
+              int length = floats.length;
+              double[] doubles = new double[length];
+              ArrayCopyUtils.copy(floats, doubles, length);
+              setColumn(rowBuilder, byteBuffer, doubles);
+            } else {
+              setColumn(rowBuilder, byteBuffer, (double[]) value);
+            }
+            break;
+          case BYTES_ARRAY:
+          case STRING_ARRAY:
+            setColumn(rowBuilder, byteBuffer, i, (String[]) value);
+            break;
+          default:
+            throw new IllegalStateException(String.format(
+                "Unsupported data type: %s for column: %s", rowBuilder._columnDataType[i],
+                rowBuilder._dataSchema.getColumnName(i)));
+        }
+      }
+      rowBuilder._fixedSizeDataByteArrayOutputStream.write(byteBuffer.array(), 0, byteBuffer.position());
+    }
+    return buildRowBlock(rowBuilder);
+  }
+
+  public static ColumnarDataBlock buildFromColumns(List<Object[]> columns, DataSchema dataSchema)
+      throws IOException {
+    DataBlockBuilder columnarBuilder = new DataBlockBuilder(dataSchema, BaseDataBlock.Type.COLUMNAR);
+    for (int i = 0; i < columns.size(); i++) {
+      Object[] column = columns.get(i);
+      columnarBuilder._numRows = column.length;
+      ByteBuffer byteBuffer = ByteBuffer.allocate(columnarBuilder._numRows * columnarBuilder._columnSizeInBytes[i]);
+      switch (columnarBuilder._columnDataType[i]) {
+        // Single-value column
+        case INT:
+          for (Object value : column) {
+            byteBuffer.putInt(((Number) value).intValue());
+          }
+          break;
+        case LONG:
+          for (Object value : column) {
+            byteBuffer.putLong(((Number) value).longValue());
+          }
+          break;
+        case FLOAT:
+          for (Object value : column) {
+            byteBuffer.putFloat(((Number) value).floatValue());
+          }
+          break;
+        case DOUBLE:
+          for (Object value : column) {
+            byteBuffer.putDouble(((Number) value).doubleValue());
+          }
+          break;
+        case BIG_DECIMAL:
+          for (Object value : column) {
+            setColumn(columnarBuilder, byteBuffer, (BigDecimal) value);
+          }
+          break;
+        case STRING:
+          for (Object value : column) {
+            setColumn(columnarBuilder, byteBuffer, i, (String) value);
+          }
+          break;
+        case BYTES:
+          for (Object value : column) {
+            setColumn(columnarBuilder, byteBuffer, (ByteArray) value);
+          }
+          break;
+        case OBJECT:
+          for (Object value : column) {
+            setColumn(columnarBuilder, byteBuffer, value);
+          }
+          break;
+        // Multi-value column
+        case BOOLEAN_ARRAY:
+        case INT_ARRAY:
+          for (Object value : column) {
+            setColumn(columnarBuilder, byteBuffer, (int[]) value);
+          }
+          break;
+        case TIMESTAMP_ARRAY:
+        case LONG_ARRAY:
+          for (Object value : column) {
+            if (value instanceof int[]) {
+              // LONG_ARRAY type covers INT_ARRAY and LONG_ARRAY
+              int[] ints = (int[]) value;
+              int length = ints.length;
+              long[] longs = new long[length];
+              ArrayCopyUtils.copy(ints, longs, length);
+              setColumn(columnarBuilder, byteBuffer, longs);
+            } else {
+              setColumn(columnarBuilder, byteBuffer, (long[]) value);
+            }
+          }
+          break;
+        case FLOAT_ARRAY:
+          for (Object value : column) {
+            setColumn(columnarBuilder, byteBuffer, (float[]) value);
+          }
+          break;
+        case DOUBLE_ARRAY:
+          for (Object value : column) {
+            // DOUBLE_ARRAY type covers INT_ARRAY, LONG_ARRAY, FLOAT_ARRAY and DOUBLE_ARRAY
+            if (value instanceof int[]) {
+              int[] ints = (int[]) value;
+              int length = ints.length;
+              double[] doubles = new double[length];
+              ArrayCopyUtils.copy(ints, doubles, length);
+              setColumn(columnarBuilder, byteBuffer, doubles);
+            } else if (value instanceof long[]) {
+              long[] longs = (long[]) value;
+              int length = longs.length;
+              double[] doubles = new double[length];
+              ArrayCopyUtils.copy(longs, doubles, length);
+              setColumn(columnarBuilder, byteBuffer, doubles);
+            } else if (value instanceof float[]) {
+              float[] floats = (float[]) value;
+              int length = floats.length;
+              double[] doubles = new double[length];
+              ArrayCopyUtils.copy(floats, doubles, length);
+              setColumn(columnarBuilder, byteBuffer, doubles);
+            } else {
+              setColumn(columnarBuilder, byteBuffer, (double[]) value);
+            }
+          }
+          break;
+        case BYTES_ARRAY:
+        case STRING_ARRAY:
+          for (Object value : column) {
+            setColumn(columnarBuilder, byteBuffer, i, (String[]) value);
+          }
+          break;
+        default:
+          throw new IllegalStateException(String.format(
+              "Unsupported data type: %s for column: %s", columnarBuilder._columnDataType[i],
+              columnarBuilder._dataSchema.getColumnName(i)));
+      }
+      columnarBuilder._fixedSizeDataByteArrayOutputStream.write(byteBuffer.array(), 0, byteBuffer.position());
+    }
+    return buildColumnarBlock(columnarBuilder);
+  }
+
+  private static RowDataBlock buildRowBlock(DataBlockBuilder builder) {
+    return new RowDataBlock(builder._numRows, builder._dataSchema, builder._reverseDictionaryMap,
+        builder._fixedSizeDataByteArrayOutputStream.toByteArray(),
+        builder._variableSizeDataByteArrayOutputStream.toByteArray());
+  }
+
+  private static ColumnarDataBlock buildColumnarBlock(DataBlockBuilder builder) {
+    return new ColumnarDataBlock(builder._numRows, builder._dataSchema, builder._reverseDictionaryMap,
+        builder._fixedSizeDataByteArrayOutputStream.toByteArray(),
+        builder._variableSizeDataByteArrayOutputStream.toByteArray());
+  }
+
+  private static void setColumn(DataBlockBuilder builder, ByteBuffer byteBuffer, BigDecimal value)
+      throws IOException {
+    byteBuffer.putInt(builder._variableSizeDataByteArrayOutputStream.size());
+    byte[] bytes = BigDecimalUtils.serialize(value);
+    byteBuffer.putInt(bytes.length);
+    builder._variableSizeDataByteArrayOutputStream.write(bytes);
+  }
+
+  private static void setColumn(DataBlockBuilder builder, ByteBuffer byteBuffer, int colId, String value) {
+    String columnName = builder._dataSchema.getColumnName(colId);
+    Map<String, Integer> dictionary = builder._dictionaryMap.get(columnName);
+    if (dictionary == null) {
+      dictionary = new HashMap<>();
+      builder._dictionaryMap.put(columnName, dictionary);
+      builder._reverseDictionaryMap.put(columnName, new HashMap<>());
+    }
+    Integer dictId = dictionary.get(value);
+    if (dictId == null) {
+      dictId = dictionary.size();
+      dictionary.put(value, dictId);
+      builder._reverseDictionaryMap.get(columnName).put(dictId, value);
+    }
+    byteBuffer.putInt(dictId);
+  }
+
+  private static void setColumn(DataBlockBuilder builder, ByteBuffer byteBuffer, ByteArray value)
+      throws IOException {
+    byteBuffer.putInt(builder._variableSizeDataByteArrayOutputStream.size());
+    byte[] bytes = value.getBytes();
+    byteBuffer.putInt(bytes.length);
+    builder._variableSizeDataByteArrayOutputStream.write(bytes);
+  }
+
+  private static void setColumn(DataBlockBuilder builder, ByteBuffer byteBuffer, Object value)
+      throws IOException {
+    byteBuffer.putInt(builder._variableSizeDataByteArrayOutputStream.size());
+    int objectTypeValue = ObjectSerDeUtils.ObjectType.getObjectType(value).getValue();
+    byte[] bytes = ObjectSerDeUtils.serialize(value, objectTypeValue);
+    byteBuffer.putInt(bytes.length);
+    builder._variableSizeDataOutputStream.writeInt(objectTypeValue);
+    builder._variableSizeDataByteArrayOutputStream.write(bytes);
+  }
+
+  private static void setColumn(DataBlockBuilder builder, ByteBuffer byteBuffer, int[] values)
+      throws IOException {
+    byteBuffer.putInt(builder._variableSizeDataByteArrayOutputStream.size());
+    byteBuffer.putInt(values.length);
+    for (int value : values) {
+      builder._variableSizeDataOutputStream.writeInt(value);
+    }
+  }
+
+  private static void setColumn(DataBlockBuilder builder, ByteBuffer byteBuffer, long[] values)
+      throws IOException {
+    byteBuffer.putInt(builder._variableSizeDataByteArrayOutputStream.size());
+    byteBuffer.putInt(values.length);
+    for (long value : values) {
+      builder._variableSizeDataOutputStream.writeLong(value);
+    }
+  }
+
+  private static void setColumn(DataBlockBuilder builder, ByteBuffer byteBuffer, float[] values)
+      throws IOException {
+    byteBuffer.putInt(builder._variableSizeDataByteArrayOutputStream.size());
+    byteBuffer.putInt(values.length);
+    for (float value : values) {
+      builder._variableSizeDataOutputStream.writeFloat(value);
+    }
+  }
+
+  private static void setColumn(DataBlockBuilder builder, ByteBuffer byteBuffer, double[] values)
+      throws IOException {
+    byteBuffer.putInt(builder._variableSizeDataByteArrayOutputStream.size());
+    byteBuffer.putInt(values.length);
+    for (double value : values) {
+      builder._variableSizeDataOutputStream.writeDouble(value);
+    }
+  }
+
+  private static void setColumn(DataBlockBuilder builder, ByteBuffer byteBuffer, int colId, String[] values)
+      throws IOException {
+    byteBuffer.putInt(builder._variableSizeDataByteArrayOutputStream.size());
+    byteBuffer.putInt(values.length);
+
+    String columnName = builder._dataSchema.getColumnName(colId);
+    Map<String, Integer> dictionary = builder._dictionaryMap.get(columnName);
+    if (dictionary == null) {
+      dictionary = new HashMap<>();
+      builder._dictionaryMap.put(columnName, dictionary);
+      builder._reverseDictionaryMap.put(columnName, new HashMap<>());
+    }
+
+    for (String value : values) {
+      Integer dictId = dictionary.get(value);
+      if (dictId == null) {
+        dictId = dictionary.size();
+        dictionary.put(value, dictId);
+        builder._reverseDictionaryMap.get(columnName).put(dictId, value);
+      }
+      builder._variableSizeDataOutputStream.writeInt(dictId);
+    }
+  }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/DataBlockUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/DataBlockUtils.java
new file mode 100644
index 0000000000..690be1353a
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/DataBlockUtils.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.blocks;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.response.ProcessingException;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataTable;
+
+
+public final class DataBlockUtils {
+  protected static final int VERSION_TYPE_SHIFT = 5;
+  private DataBlockUtils() {
+    // do not instantiate.
+  }
+
+  private static final DataSchema EMPTY_SCHEMA = new DataSchema(new String[0], new DataSchema.ColumnDataType[0]);
+  private static final MetadataBlock EOS_DATA_BLOCK = new MetadataBlock(EMPTY_SCHEMA);
+  static {
+    EOS_DATA_BLOCK._metadata.put(DataTable.MetadataKey.TABLE.getName(), "END_OF_STREAM");
+  }
+  private static final TransferableBlock EOS_TRANSFERABLE_BLOCK = new TransferableBlock(EOS_DATA_BLOCK);
+
+  public static TransferableBlock getEndOfStreamTransferableBlock() {
+    return EOS_TRANSFERABLE_BLOCK;
+  }
+
+  public static MetadataBlock getEndOfStreamDataBlock() {
+    return EOS_DATA_BLOCK;
+  }
+
+  public static MetadataBlock getErrorDataBlock(Exception e) {
+    MetadataBlock errorBlock = new MetadataBlock(EMPTY_SCHEMA);
+    errorBlock._metadata.put(DataTable.MetadataKey.TABLE.getName(), "ERROR");
+    if (e instanceof ProcessingException) {
+      errorBlock.addException(((ProcessingException) e).getErrorCode(), e.getMessage());
+    } else {
+      errorBlock.addException(QueryException.UNKNOWN_ERROR_CODE, e.getMessage());
+    }
+    return errorBlock;
+  }
+
+  public static TransferableBlock getErrorTransferableBlock(Exception e) {
+    return new TransferableBlock(getErrorDataBlock(e));
+  }
+
+  public static MetadataBlock getEmptyDataBlock(DataSchema dataSchema) {
+    return dataSchema == null ? EOS_DATA_BLOCK : new MetadataBlock(dataSchema);
+  }
+
+  public static TransferableBlock getEmptyTransferableBlock(DataSchema dataSchema) {
+    return new TransferableBlock(getEmptyDataBlock(dataSchema));
+  }
+
+  public static boolean isEndOfStream(TransferableBlock transferableBlock) {
+    return transferableBlock.getType().equals(BaseDataBlock.Type.METADATA)
+        && "END_OF_STREAM".equals(transferableBlock.getDataBlock().getMetadata()
+            .get(DataTable.MetadataKey.TABLE.getName()));
+  }
+
+  public static BaseDataBlock getDataBlock(ByteBuffer byteBuffer)
+      throws IOException {
+    int versionType = byteBuffer.getInt();
+    int version = versionType & ((1 << VERSION_TYPE_SHIFT) - 1);
+    BaseDataBlock.Type type = BaseDataBlock.Type.fromOrdinal(versionType >> VERSION_TYPE_SHIFT);
+    switch (type) {
+      case COLUMNAR:
+        return new ColumnarDataBlock(byteBuffer);
+      case ROW:
+        return new RowDataBlock(byteBuffer);
+      case METADATA:
+        return new MetadataBlock(byteBuffer);
+      default:
+        throw new UnsupportedOperationException("Unsupported data table version: " + version + " with type: " + type);
+    }
+  }
+
+  /**
+   * Given a {@link DataSchema}, compute each column's offset and fill them into the passed in array, then return the
+   * row size in bytes.
+   *
+   * @param dataSchema data schema.
+   * @param columnOffsets array of column offsets.
+   * @return row size in bytes.
+   */
+  public static int computeColumnOffsets(DataSchema dataSchema, int[] columnOffsets) {
+    int numColumns = columnOffsets.length;
+    assert numColumns == dataSchema.size();
+
+    DataSchema.ColumnDataType[] storedColumnDataTypes = dataSchema.getStoredColumnDataTypes();
+    int rowSizeInBytes = 0;
+    for (int i = 0; i < numColumns; i++) {
+      columnOffsets[i] = rowSizeInBytes;
+      switch (storedColumnDataTypes[i]) {
+        case INT:
+          rowSizeInBytes += 4;
+          break;
+        case LONG:
+          rowSizeInBytes += 8;
+          break;
+        case FLOAT:
+          rowSizeInBytes += 4;
+          break;
+        case DOUBLE:
+          rowSizeInBytes += 8;
+          break;
+        case STRING:
+          rowSizeInBytes += 4;
+          break;
+        // Object and array. (POSITION|LENGTH)
+        default:
+          rowSizeInBytes += 8;
+          break;
+      }
+    }
+
+    return rowSizeInBytes;
+  }
+
+  /**
+   * Given a {@link DataSchema}, compute each column's size and fill them into the passed in array.
+   *
+   * @param dataSchema data schema.
+   * @param columnSizes array of column size.
+   * @return row size in bytes.
+   */
+  public static void computeColumnSizeInBytes(DataSchema dataSchema, int[] columnSizes) {
+    int numColumns = columnSizes.length;
+    assert numColumns == dataSchema.size();
+
+    DataSchema.ColumnDataType[] storedColumnDataTypes = dataSchema.getStoredColumnDataTypes();
+    for (int i = 0; i < numColumns; i++) {
+      switch (storedColumnDataTypes[i]) {
+        case INT:
+          columnSizes[i] = 4;
+          break;
+        case LONG:
+          columnSizes[i] = 8;
+          break;
+        case FLOAT:
+          columnSizes[i] = 4;
+          break;
+        case DOUBLE:
+          columnSizes[i] = 8;
+          break;
+        case STRING:
+          columnSizes[i] = 4;
+          break;
+        // Object and array. (POSITION|LENGTH)
+        default:
+          columnSizes[i] = 8;
+          break;
+      }
+    }
+  }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/DataTableBlockUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/DataTableBlockUtils.java
deleted file mode 100644
index c429e2e014..0000000000
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/DataTableBlockUtils.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.query.runtime.blocks;
-
-import org.apache.pinot.common.exception.QueryException;
-import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.common.utils.DataTable;
-import org.apache.pinot.core.common.datatable.DataTableBuilder;
-
-
-public final class DataTableBlockUtils {
-  private DataTableBlockUtils() {
-    // do not instantiate.
-  }
-
-  // used to indicate a datatable block status
-  private static final DataSchema EMPTY_SCHEMA = new DataSchema(new String[0], new DataSchema.ColumnDataType[0]);
-  private static final DataTable EMPTY_DATATABLE = new DataTableBuilder(EMPTY_SCHEMA).build();
-  private static final DataTableBlock END_OF_STREAM_DATATABLE_BLOCK = new DataTableBlock(EMPTY_DATATABLE);
-
-  public static DataTableBlock getEndOfStreamDataTableBlock() {
-    return END_OF_STREAM_DATATABLE_BLOCK;
-  }
-
-  public static DataTable getEndOfStreamDataTable() {
-    return EMPTY_DATATABLE;
-  }
-
-  public static DataTable getErrorDataTable(Exception e) {
-    DataTable errorDataTable = new DataTableBuilder(EMPTY_SCHEMA).build();
-    errorDataTable.addException(QueryException.UNKNOWN_ERROR_CODE, e.getMessage());
-    return errorDataTable;
-  }
-
-  public static DataTableBlock getErrorDatatableBlock(Exception e) {
-    return new DataTableBlock(getErrorDataTable(e));
-  }
-
-  public static DataTable getEmptyDataTable(DataSchema dataSchema) {
-    if (dataSchema != null) {
-      return new DataTableBuilder(dataSchema).build();
-    } else {
-      return EMPTY_DATATABLE;
-    }
-  }
-
-  public static DataTableBlock getEmptyDataTableBlock(DataSchema dataSchema) {
-    return new DataTableBlock(getEmptyDataTable(dataSchema));
-  }
-
-  public static boolean isEndOfStream(DataTableBlock dataTableBlock) {
-    DataSchema dataSchema = dataTableBlock.getDataTable().getDataSchema();
-    return dataSchema.getColumnNames().length == 0 && dataSchema.getColumnDataTypes().length == 0;
-  }
-}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/MetadataBlock.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/MetadataBlock.java
new file mode 100644
index 0000000000..a3ffc7a126
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/MetadataBlock.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.blocks;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import org.apache.pinot.common.utils.DataSchema;
+
+
+/**
+ * Wrapper for row-wise data table. It stores data in row-major format.
+ */
+public class MetadataBlock extends BaseDataBlock {
+  private static final int VERSION = 1;
+
+  public MetadataBlock(DataSchema dataSchema) {
+    super(0, dataSchema, Collections.emptyMap(), new byte[]{0}, new byte[]{0});
+  }
+
+  public MetadataBlock(ByteBuffer byteBuffer)
+      throws IOException {
+    super(byteBuffer);
+  }
+
+  @Override
+  protected int getDataBlockVersionType() {
+    return VERSION + (Type.METADATA.ordinal() << DataBlockUtils.VERSION_TYPE_SHIFT);
+  }
+
+  @Override
+  protected void positionCursorInFixSizedBuffer(int rowId, int colId) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  protected int positionCursorInVariableBuffer(int rowId, int colId) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public MetadataBlock toMetadataOnlyDataTable() {
+    return this;
+  }
+
+  @Override
+  public MetadataBlock toDataOnlyDataTable() {
+    return new MetadataBlock(_dataSchema);
+  }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/RowDataBlock.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/RowDataBlock.java
new file mode 100644
index 0000000000..f8520c54f6
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/RowDataBlock.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.blocks;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import org.apache.pinot.common.utils.DataSchema;
+
+
+/**
+ * Wrapper for row-wise data table. It stores data in row-major format.
+ */
+public class RowDataBlock extends BaseDataBlock {
+  private static final int VERSION = 1;
+  protected int[] _columnOffsets;
+  protected int _rowSizeInBytes;
+
+  public RowDataBlock() {
+    super();
+  }
+
+  public RowDataBlock(int numRows, DataSchema dataSchema, Map<String, Map<Integer, String>> dictionaryMap,
+      byte[] fixedSizeDataBytes, byte[] variableSizeDataBytes) {
+    super(numRows, dataSchema, dictionaryMap, fixedSizeDataBytes, variableSizeDataBytes);
+    computeBlockObjectConstants();
+  }
+
+  public RowDataBlock(ByteBuffer byteBuffer)
+      throws IOException {
+    super(byteBuffer);
+    computeBlockObjectConstants();
+  }
+
+  protected void computeBlockObjectConstants() {
+    if (_dataSchema != null) {
+      _columnOffsets = new int[_numColumns];
+      _rowSizeInBytes = DataBlockUtils.computeColumnOffsets(_dataSchema, _columnOffsets);
+    }
+  }
+
+  @Override
+  protected int getDataBlockVersionType() {
+    return VERSION + (Type.ROW.ordinal() << DataBlockUtils.VERSION_TYPE_SHIFT);
+  }
+
+  @Override
+  protected void positionCursorInFixSizedBuffer(int rowId, int colId) {
+    int position = rowId * _rowSizeInBytes + _columnOffsets[colId];
+    _fixedSizeData.position(position);
+  }
+
+  @Override
+  protected int positionCursorInVariableBuffer(int rowId, int colId) {
+    positionCursorInFixSizedBuffer(rowId, colId);
+    _variableSizeData.position(_fixedSizeData.getInt());
+    return _fixedSizeData.getInt();
+  }
+
+  @Override
+  public RowDataBlock toMetadataOnlyDataTable() {
+    RowDataBlock metadataOnlyDataTable = new RowDataBlock();
+    metadataOnlyDataTable._metadata.putAll(_metadata);
+    metadataOnlyDataTable._errCodeToExceptionMap.putAll(_errCodeToExceptionMap);
+    return metadataOnlyDataTable;
+  }
+
+  @Override
+  public RowDataBlock toDataOnlyDataTable() {
+    return new RowDataBlock(_numRows, _dataSchema, _dictionaryMap, _fixedSizeDataBytes, _variableSizeDataBytes);
+  }
+
+  // TODO: add whole-row access methods.
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/DataTableBlock.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
similarity index 70%
rename from pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/DataTableBlock.java
rename to pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
index a8cc63c624..9284011f06 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/DataTableBlock.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
@@ -19,31 +19,39 @@
 package org.apache.pinot.query.runtime.blocks;
 
 import java.io.IOException;
-import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.core.common.Block;
 import org.apache.pinot.core.common.BlockDocIdSet;
 import org.apache.pinot.core.common.BlockDocIdValueSet;
 import org.apache.pinot.core.common.BlockMetadata;
 import org.apache.pinot.core.common.BlockValSet;
-import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 /**
- * A {@code DataTableBlock} is a row-based data block backed by a {@link DataTable}.
+ * A {@code TransferableBlock} is a wrapper around {@link BaseDataBlock} for transferring data using
+ * {@link org.apache.pinot.common.proto.Mailbox}.
  */
-public class DataTableBlock implements Block {
-  private static final Logger LOGGER = LoggerFactory.getLogger(InstanceResponseBlock.class);
+public class TransferableBlock implements Block {
 
-  private DataTable _dataTable;
+  private BaseDataBlock _dataBlock;
+  private BaseDataBlock.Type _type;
 
-  public DataTableBlock(DataTable dataTable) {
-    _dataTable = dataTable;
+  public TransferableBlock(BaseDataBlock dataBlock) {
+    _dataBlock = dataBlock;
+    _type = dataBlock instanceof ColumnarDataBlock ? BaseDataBlock.Type.COLUMNAR
+        : dataBlock instanceof RowDataBlock ? BaseDataBlock.Type.ROW : BaseDataBlock.Type.METADATA;
   }
 
-  public DataTable getDataTable() {
-    return _dataTable;
+  public BaseDataBlock getDataBlock() {
+    return _dataBlock;
+  }
+
+  public BaseDataBlock.Type getType() {
+    return _type;
+  }
+
+  public byte[] toBytes()
+      throws IOException {
+    return _dataBlock.toBytes();
   }
 
   @Override
@@ -65,9 +73,4 @@ public class DataTableBlock implements Block {
   public BlockMetadata getMetadata() {
     throw new UnsupportedOperationException();
   }
-
-  public byte[] toBytes()
-      throws IOException {
-    return _dataTable.toBytes();
-  }
 }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
index 9a66bf86c1..d8ec43e040 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
@@ -36,8 +36,8 @@ import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
 import org.apache.pinot.query.planner.stage.MailboxSendNode;
 import org.apache.pinot.query.planner.stage.ProjectNode;
 import org.apache.pinot.query.planner.stage.StageNode;
-import org.apache.pinot.query.runtime.blocks.DataTableBlock;
-import org.apache.pinot.query.runtime.blocks.DataTableBlockUtils;
+import org.apache.pinot.query.runtime.blocks.DataBlockUtils;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.operator.HashJoinOperator;
 import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
 import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
@@ -83,12 +83,12 @@ public class WorkerQueryExecutor {
       ExecutorService executorService) {
     long requestId = Long.parseLong(requestMetadataMap.get("REQUEST_ID"));
     StageNode stageRoot = queryRequest.getStageRoot();
-    BaseOperator<DataTableBlock> rootOperator = getOperator(requestId, stageRoot, queryRequest.getMetadataMap());
+    BaseOperator<TransferableBlock> rootOperator = getOperator(requestId, stageRoot, queryRequest.getMetadataMap());
     executorService.submit(new TraceRunnable() {
       @Override
       public void runJob() {
         ThreadTimer executionThreadTimer = new ThreadTimer();
-        while (!DataTableBlockUtils.isEndOfStream(rootOperator.nextBlock())) {
+        while (!DataBlockUtils.isEndOfStream(rootOperator.nextBlock())) {
           LOGGER.debug("Result Block acquired");
         }
         LOGGER.info("Execution time:" + executionThreadTimer.getThreadTimeNs());
@@ -97,7 +97,7 @@ public class WorkerQueryExecutor {
   }
 
   // TODO: split this PhysicalPlanner into a separate module
-  private BaseOperator<DataTableBlock> getOperator(long requestId, StageNode stageNode,
+  private BaseOperator<TransferableBlock> getOperator(long requestId, StageNode stageNode,
       Map<Integer, StageMetadata> metadataMap) {
     // TODO: optimize this into a framework. (physical planner)
     if (stageNode instanceof MailboxReceiveNode) {
@@ -107,15 +107,15 @@ public class WorkerQueryExecutor {
           requestId, receiveNode.getSenderStageId());
     } else if (stageNode instanceof MailboxSendNode) {
       MailboxSendNode sendNode = (MailboxSendNode) stageNode;
-      BaseOperator<DataTableBlock> nextOperator = getOperator(requestId, sendNode.getInputs().get(0), metadataMap);
+      BaseOperator<TransferableBlock> nextOperator = getOperator(requestId, sendNode.getInputs().get(0), metadataMap);
       StageMetadata receivingStageMetadata = metadataMap.get(sendNode.getReceiverStageId());
       return new MailboxSendOperator(_mailboxService, nextOperator, receivingStageMetadata.getServerInstances(),
           sendNode.getExchangeType(), sendNode.getPartitionKeySelector(), _hostName, _port, requestId,
           sendNode.getStageId());
     } else if (stageNode instanceof JoinNode) {
       JoinNode joinNode = (JoinNode) stageNode;
-      BaseOperator<DataTableBlock> leftOperator = getOperator(requestId, joinNode.getInputs().get(0), metadataMap);
-      BaseOperator<DataTableBlock> rightOperator = getOperator(requestId, joinNode.getInputs().get(1), metadataMap);
+      BaseOperator<TransferableBlock> leftOperator = getOperator(requestId, joinNode.getInputs().get(0), metadataMap);
+      BaseOperator<TransferableBlock> rightOperator = getOperator(requestId, joinNode.getInputs().get(1), metadataMap);
       return new HashJoinOperator(leftOperator, rightOperator, joinNode.getCriteria());
     } else if (stageNode instanceof FilterNode) {
       throw new UnsupportedOperationException("Unsupported!");
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
index 238c1d807b..ff99dae5ed 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
@@ -24,14 +24,15 @@ import java.util.HashMap;
 import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
 import org.apache.pinot.query.planner.partitioning.KeySelector;
 import org.apache.pinot.query.planner.stage.JoinNode;
-import org.apache.pinot.query.runtime.blocks.DataTableBlock;
-import org.apache.pinot.query.runtime.blocks.DataTableBlockUtils;
+import org.apache.pinot.query.runtime.blocks.BaseDataBlock;
+import org.apache.pinot.query.runtime.blocks.DataBlockBuilder;
+import org.apache.pinot.query.runtime.blocks.DataBlockUtils;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 
 
 /**
@@ -42,12 +43,12 @@ import org.apache.pinot.query.runtime.blocks.DataTableBlockUtils;
  *
  * <p>For each of the data block received from the left table, it will generate a joint data block.
  */
-public class HashJoinOperator extends BaseOperator<DataTableBlock> {
+public class HashJoinOperator extends BaseOperator<TransferableBlock> {
   private static final String EXPLAIN_NAME = "BROADCAST_JOIN";
 
   private final HashMap<Object, List<Object[]>> _broadcastHashTable;
-  private final BaseOperator<DataTableBlock> _leftTableOperator;
-  private final BaseOperator<DataTableBlock> _rightTableOperator;
+  private final BaseOperator<TransferableBlock> _leftTableOperator;
+  private final BaseOperator<TransferableBlock> _rightTableOperator;
 
   private DataSchema _leftTableSchema;
   private DataSchema _rightTableSchema;
@@ -56,8 +57,8 @@ public class HashJoinOperator extends BaseOperator<DataTableBlock> {
   private KeySelector<Object[], Object> _leftKeySelector;
   private KeySelector<Object[], Object> _rightKeySelector;
 
-  public HashJoinOperator(BaseOperator<DataTableBlock> leftTableOperator,
-      BaseOperator<DataTableBlock> rightTableOperator, List<JoinNode.JoinClause> criteria) {
+  public HashJoinOperator(BaseOperator<TransferableBlock> leftTableOperator,
+      BaseOperator<TransferableBlock> rightTableOperator, List<JoinNode.JoinClause> criteria) {
     // TODO: this assumes right table is broadcast.
     _leftKeySelector = criteria.get(0).getLeftJoinKeySelector();
     _rightKeySelector = criteria.get(0).getRightJoinKeySelector();
@@ -80,25 +81,25 @@ public class HashJoinOperator extends BaseOperator<DataTableBlock> {
   }
 
   @Override
-  protected DataTableBlock getNextBlock() {
+  protected TransferableBlock getNextBlock() {
     buildBroadcastHashTable();
     try {
-      return new DataTableBlock(buildJoinedDataTable(_leftTableOperator.nextBlock()));
+      return new TransferableBlock(buildJoinedDataBlock(_leftTableOperator.nextBlock()));
     } catch (Exception e) {
-      return DataTableBlockUtils.getErrorDatatableBlock(e);
+      return DataBlockUtils.getErrorTransferableBlock(e);
     }
   }
 
   private void buildBroadcastHashTable() {
     if (!_isHashTableBuilt) {
-      DataTableBlock rightBlock = _rightTableOperator.nextBlock();
-      while (!DataTableBlockUtils.isEndOfStream(rightBlock)) {
-        DataTable dataTable = rightBlock.getDataTable();
-        _rightTableSchema = dataTable.getDataSchema();
-        int numRows = dataTable.getNumberOfRows();
+      TransferableBlock rightBlock = _rightTableOperator.nextBlock();
+      while (!DataBlockUtils.isEndOfStream(rightBlock)) {
+        BaseDataBlock dataBlock = rightBlock.getDataBlock();
+        _rightTableSchema = dataBlock.getDataSchema();
+        int numRows = dataBlock.getNumberOfRows();
         // put all the rows into corresponding hash collections keyed by the key selector function.
         for (int rowId = 0; rowId < numRows; rowId++) {
-          Object[] objects = SelectionOperatorUtils.extractRowFromDataTable(dataTable, rowId);
+          Object[] objects = SelectionOperatorUtils.extractRowFromDataTable(dataBlock, rowId);
           List<Object[]> hashCollection =
               _broadcastHashTable.computeIfAbsent(_rightKeySelector.getKey(objects), k -> new ArrayList<>());
           hashCollection.add(objects);
@@ -109,25 +110,25 @@ public class HashJoinOperator extends BaseOperator<DataTableBlock> {
     }
   }
 
-  private DataTable buildJoinedDataTable(DataTableBlock block)
+  private BaseDataBlock buildJoinedDataBlock(TransferableBlock block)
       throws Exception {
-    if (DataTableBlockUtils.isEndOfStream(block)) {
-      return DataTableBlockUtils.getEndOfStreamDataTable();
+    if (DataBlockUtils.isEndOfStream(block)) {
+      return DataBlockUtils.getEndOfStreamDataBlock();
     }
     List<Object[]> rows = new ArrayList<>();
-    DataTable dataTable = block.getDataTable();
-    _leftTableSchema = dataTable.getDataSchema();
+    BaseDataBlock dataBlock = block.getDataBlock();
+    _leftTableSchema = dataBlock.getDataSchema();
     _resultRowSize = _leftTableSchema.size() + _rightTableSchema.size();
-    int numRows = dataTable.getNumberOfRows();
+    int numRows = dataBlock.getNumberOfRows();
     for (int rowId = 0; rowId < numRows; rowId++) {
-      Object[] leftRow = SelectionOperatorUtils.extractRowFromDataTable(dataTable, rowId);
+      Object[] leftRow = SelectionOperatorUtils.extractRowFromDataTable(dataBlock, rowId);
       List<Object[]> hashCollection =
           _broadcastHashTable.getOrDefault(_leftKeySelector.getKey(leftRow), Collections.emptyList());
       for (Object[] rightRow : hashCollection) {
         rows.add(joinRow(leftRow, rightRow));
       }
     }
-    return SelectionOperatorUtils.getDataTableFromRows(rows, computeSchema());
+    return DataBlockBuilder.buildFromRows(rows, computeSchema());
   }
 
   private Object[] joinRow(Object[] leftRow, Object[] rightRow) {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
index 7b396b13cc..b84d8f40f0 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
@@ -24,16 +24,15 @@ import javax.annotation.Nullable;
 import org.apache.calcite.rel.RelDistribution;
 import org.apache.pinot.common.proto.Mailbox;
 import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.core.common.Operator;
-import org.apache.pinot.core.common.datatable.DataTableFactory;
 import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.core.transport.ServerInstance;
 import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.mailbox.ReceivingMailbox;
 import org.apache.pinot.query.mailbox.StringMailboxIdentifier;
-import org.apache.pinot.query.runtime.blocks.DataTableBlock;
-import org.apache.pinot.query.runtime.blocks.DataTableBlockUtils;
+import org.apache.pinot.query.runtime.blocks.BaseDataBlock;
+import org.apache.pinot.query.runtime.blocks.DataBlockUtils;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,7 +41,7 @@ import org.slf4j.LoggerFactory;
  * This {@code MailboxReceiveOperator} receives data from a {@link ReceivingMailbox} and serve it out from the
  * {@link BaseOperator#getNextBlock()} API.
  */
-public class MailboxReceiveOperator extends BaseOperator<DataTableBlock> {
+public class MailboxReceiveOperator extends BaseOperator<TransferableBlock> {
   private static final Logger LOGGER = LoggerFactory.getLogger(MailboxReceiveOperator.class);
   private static final String EXPLAIN_NAME = "MAILBOX_RECEIVE";
   private static final long DEFAULT_TIMEOUT_NANO = 10_000_000_000L;
@@ -80,7 +79,7 @@ public class MailboxReceiveOperator extends BaseOperator<DataTableBlock> {
   }
 
   @Override
-  protected DataTableBlock getNextBlock() {
+  protected TransferableBlock getNextBlock() {
     // TODO: do a round robin check against all MailboxContentStreamObservers and find which one that has data.
     boolean hasOpenedMailbox = true;
     DataSchema dataSchema = null;
@@ -99,10 +98,10 @@ public class MailboxReceiveOperator extends BaseOperator<DataTableBlock> {
             if (mailboxContent != null) {
               ByteBuffer byteBuffer = mailboxContent.getPayload().asReadOnlyByteBuffer();
               if (byteBuffer.hasRemaining()) {
-                DataTable dataTable = DataTableFactory.getDataTable(byteBuffer);
-                if (dataTable.getNumberOfRows() > 0) {
+                BaseDataBlock dataBlock = DataBlockUtils.getDataBlock(byteBuffer);
+                if (dataBlock.getNumberOfRows() > 0) {
                   // here we only return data table block when it is not empty.
-                  return new DataTableBlock(dataTable);
+                  return new TransferableBlock(dataBlock);
                 }
               }
             }
@@ -117,7 +116,7 @@ public class MailboxReceiveOperator extends BaseOperator<DataTableBlock> {
     }
     // TODO: we need to at least return one data table with schema if there's no error.
     // we need to condition this on whether there's already things being returned or not.
-    return DataTableBlockUtils.getEndOfStreamDataTableBlock();
+    return DataBlockUtils.getEndOfStreamTransferableBlock();
   }
 
   public RelDistribution.Type getExchangeType() {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index 51f24c97d4..8629663b60 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -37,16 +37,18 @@ import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.mailbox.SendingMailbox;
 import org.apache.pinot.query.mailbox.StringMailboxIdentifier;
 import org.apache.pinot.query.planner.partitioning.KeySelector;
-import org.apache.pinot.query.runtime.blocks.DataTableBlock;
-import org.apache.pinot.query.runtime.blocks.DataTableBlockUtils;
+import org.apache.pinot.query.runtime.blocks.BaseDataBlock;
+import org.apache.pinot.query.runtime.blocks.DataBlockBuilder;
+import org.apache.pinot.query.runtime.blocks.DataBlockUtils;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
 /**
- * This {@code MailboxSendOperator} is created to send {@link DataTableBlock}s to the receiving end.
+ * This {@code MailboxSendOperator} is created to send {@link TransferableBlock}s to the receiving end.
  */
-public class MailboxSendOperator extends BaseOperator<DataTableBlock> {
+public class MailboxSendOperator extends BaseOperator<TransferableBlock> {
   private static final Logger LOGGER = LoggerFactory.getLogger(MailboxSendOperator.class);
   private static final String EXPLAIN_NAME = "MAILBOX_SEND";
   private static final Set<RelDistribution.Type> SUPPORTED_EXCHANGE_TYPE =
@@ -61,11 +63,11 @@ public class MailboxSendOperator extends BaseOperator<DataTableBlock> {
   private final long _jobId;
   private final int _stageId;
   private final MailboxService<Mailbox.MailboxContent> _mailboxService;
-  private BaseOperator<DataTableBlock> _dataTableBlockBaseOperator;
-  private DataTable _dataTable;
+  private BaseOperator<TransferableBlock> _dataTableBlockBaseOperator;
+  private BaseDataBlock _dataTable;
 
   public MailboxSendOperator(MailboxService<Mailbox.MailboxContent> mailboxService,
-      BaseOperator<DataTableBlock> dataTableBlockBaseOperator, List<ServerInstance> receivingStageInstances,
+      BaseOperator<TransferableBlock> dataTableBlockBaseOperator, List<ServerInstance> receivingStageInstances,
       RelDistribution.Type exchangeType, KeySelector<Object[], Object> keySelector, String hostName, int port,
       long jobId, int stageId) {
     _mailboxService = mailboxService;
@@ -86,7 +88,7 @@ public class MailboxSendOperator extends BaseOperator<DataTableBlock> {
    * we create a {@link org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl} that can handle the
    * creation of MailboxSendOperator we should not use this API.
    */
-  public MailboxSendOperator(MailboxService<Mailbox.MailboxContent> mailboxService, DataTable dataTable,
+  public MailboxSendOperator(MailboxService<Mailbox.MailboxContent> mailboxService, BaseDataBlock dataTable,
       List<ServerInstance> receivingStageInstances, RelDistribution.Type exchangeType,
       KeySelector<Object[], Object> keySelector, String hostName, int port, long jobId, int stageId) {
     _mailboxService = mailboxService;
@@ -113,14 +115,14 @@ public class MailboxSendOperator extends BaseOperator<DataTableBlock> {
   }
 
   @Override
-  protected DataTableBlock getNextBlock() {
-    DataTable dataTable;
-    DataTableBlock dataTableBlock = null;
+  protected TransferableBlock getNextBlock() {
+    BaseDataBlock dataTable;
+    TransferableBlock transferableBlock = null;
     boolean isEndOfStream;
     if (_dataTableBlockBaseOperator != null) {
-      dataTableBlock = _dataTableBlockBaseOperator.nextBlock();
-      dataTable = dataTableBlock.getDataTable();
-      isEndOfStream = DataTableBlockUtils.isEndOfStream(dataTableBlock);
+      transferableBlock = _dataTableBlockBaseOperator.nextBlock();
+      dataTable = transferableBlock.getDataBlock();
+      isEndOfStream = DataBlockUtils.isEndOfStream(transferableBlock);
     } else {
       dataTable = _dataTable;
       isEndOfStream = true;
@@ -137,7 +139,7 @@ public class MailboxSendOperator extends BaseOperator<DataTableBlock> {
             // we no longer need to send data to the rest of the receiving instances, but we still need to transfer
             // the dataTable over indicating that we are a potential sender. thus next time a random server is selected
             // it might still be useful.
-            dataTable = DataTableBlockUtils.getEmptyDataTable(dataTable.getDataSchema());
+            dataTable = DataBlockUtils.getEmptyDataBlock(dataTable.getDataSchema());
           }
           break;
         case BROADCAST_DISTRIBUTED:
@@ -147,7 +149,7 @@ public class MailboxSendOperator extends BaseOperator<DataTableBlock> {
           break;
         case HASH_DISTRIBUTED:
           // TODO: ensure that server instance list is sorted using same function in sender.
-          List<DataTable> dataTableList = constructPartitionedDataBlock(dataTable, _keySelector,
+          List<BaseDataBlock> dataTableList = constructPartitionedDataBlock(dataTable, _keySelector,
               _receivingStageInstances.size());
           for (int i = 0; i < _receivingStageInstances.size(); i++) {
             sendDataTableBlock(_receivingStageInstances.get(i), dataTableList.get(i), isEndOfStream);
@@ -162,10 +164,10 @@ public class MailboxSendOperator extends BaseOperator<DataTableBlock> {
     } catch (Exception e) {
       LOGGER.error("Exception occur while sending data via mailbox", e);
     }
-    return dataTableBlock;
+    return transferableBlock;
   }
 
-  private static List<DataTable> constructPartitionedDataBlock(DataTable dataTable,
+  private static List<BaseDataBlock> constructPartitionedDataBlock(DataTable dataTable,
       KeySelector<Object[], Object> keySelector, int partitionSize)
       throws Exception {
     List<List<Object[]>> temporaryRows = new ArrayList<>(partitionSize);
@@ -178,10 +180,10 @@ public class MailboxSendOperator extends BaseOperator<DataTableBlock> {
       // TODO: support other partitioning algorithm
       temporaryRows.get(hashToIndex(key, partitionSize)).add(row);
     }
-    List<DataTable> dataTableList = new ArrayList<>(partitionSize);
+    List<BaseDataBlock> dataTableList = new ArrayList<>(partitionSize);
     for (int i = 0; i < partitionSize; i++) {
       List<Object[]> objects = temporaryRows.get(i);
-      dataTableList.add(SelectionOperatorUtils.getDataTableFromRows(objects, dataTable.getDataSchema()));
+      dataTableList.add(DataBlockBuilder.buildFromRows(objects, dataTable.getDataSchema()));
     }
     return dataTableList;
   }
@@ -190,7 +192,7 @@ public class MailboxSendOperator extends BaseOperator<DataTableBlock> {
     return (key.hashCode()) % partitionSize;
   }
 
-  private void sendDataTableBlock(ServerInstance serverInstance, DataTable dataTable, boolean isEndOfStream)
+  private void sendDataTableBlock(ServerInstance serverInstance, BaseDataBlock dataTable, boolean isEndOfStream)
       throws IOException {
     String mailboxId = toMailboxId(serverInstance);
     SendingMailbox<Mailbox.MailboxContent> sendingMailbox = _mailboxService.getSendingMailbox(mailboxId);
@@ -201,10 +203,10 @@ public class MailboxSendOperator extends BaseOperator<DataTableBlock> {
     }
   }
 
-  private Mailbox.MailboxContent toMailboxContent(String mailboxId, DataTable dataTable, boolean isEndOfStream)
+  private Mailbox.MailboxContent toMailboxContent(String mailboxId, BaseDataBlock dataTable, boolean isEndOfStream)
       throws IOException {
     Mailbox.MailboxContent.Builder builder = Mailbox.MailboxContent.newBuilder().setMailboxId(mailboxId)
-        .setPayload(ByteString.copyFrom(new DataTableBlock(dataTable).toBytes()));
+        .setPayload(ByteString.copyFrom(new TransferableBlock(dataTable).toBytes()));
     if (isEndOfStream) {
       builder.putMetadata("finished", "true");
     }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
index 7ea69b8d21..fa9f04808c 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
@@ -33,8 +33,9 @@ import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.planner.QueryPlan;
 import org.apache.pinot.query.planner.StageMetadata;
 import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
-import org.apache.pinot.query.runtime.blocks.DataTableBlock;
-import org.apache.pinot.query.runtime.blocks.DataTableBlockUtils;
+import org.apache.pinot.query.runtime.blocks.BaseDataBlock;
+import org.apache.pinot.query.runtime.blocks.DataBlockUtils;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
 import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
 import org.apache.pinot.query.runtime.plan.serde.QueryPlanSerDeUtils;
@@ -58,24 +59,13 @@ public class QueryDispatcher {
       throws Exception {
     // submit all the distributed stages.
     int reduceStageId = submit(requestId, queryPlan);
-
-    // run reduce stage.
+    // run reduce stage and return result.
     MailboxReceiveNode reduceNode = (MailboxReceiveNode) queryPlan.getQueryStageMap().get(reduceStageId);
     MailboxReceiveOperator mailboxReceiveOperator = createReduceStageOperator(mailboxService,
         queryPlan.getStageMetadataMap().get(reduceNode.getSenderStageId()).getServerInstances(),
         requestId, reduceNode.getSenderStageId(), mailboxService.getHostname(),
         mailboxService.getMailboxPort());
-
-    List<DataTable> queryResults = new ArrayList<>();
-    long timeoutWatermark = System.nanoTime() + timeoutNano;
-    while (System.nanoTime() < timeoutWatermark) {
-      DataTableBlock dataTableBlock = mailboxReceiveOperator.nextBlock();
-      queryResults.add(dataTableBlock.getDataTable());
-      if (DataTableBlockUtils.isEndOfStream(dataTableBlock)) {
-        break;
-      }
-    }
-    return queryResults;
+    return reduceMailboxReceive(mailboxReceiveOperator);
   }
 
   public int submit(long requestId, QueryPlan queryPlan)
@@ -109,12 +99,9 @@ public class QueryDispatcher {
     return reduceStageId;
   }
 
-  protected MailboxReceiveOperator createReduceStageOperator(MailboxService<Mailbox.MailboxContent> mailboxService,
-      List<ServerInstance> sendingInstances, long jobId, int stageId, String hostname, int port) {
-    MailboxReceiveOperator mailboxReceiveOperator =
-        new MailboxReceiveOperator(mailboxService, RelDistribution.Type.ANY, sendingInstances, hostname, port, jobId,
-            stageId);
-    return mailboxReceiveOperator;
+  private DispatchClient getOrCreateDispatchClient(String host, int port) {
+    String key = String.format("%s_%d", host, port);
+    return _dispatchClientMap.computeIfAbsent(key, k -> new DispatchClient(host, port));
   }
 
   public static DistributedStagePlan constructDistributedStagePlan(QueryPlan queryPlan, int stageId,
@@ -123,9 +110,28 @@ public class QueryDispatcher {
         queryPlan.getStageMetadataMap());
   }
 
-  private DispatchClient getOrCreateDispatchClient(String host, int port) {
-    String key = String.format("%s_%d", host, port);
-    return _dispatchClientMap.computeIfAbsent(key, k -> new DispatchClient(host, port));
+  public static List<DataTable> reduceMailboxReceive(MailboxReceiveOperator mailboxReceiveOperator) {
+    List<DataTable> resultDataBlocks = new ArrayList<>();
+    TransferableBlock transferableBlock;
+    while (true) {
+      transferableBlock = mailboxReceiveOperator.nextBlock();
+      if (DataBlockUtils.isEndOfStream(transferableBlock)) {
+        break;
+      }
+      if (transferableBlock.getDataBlock() != null) {
+        BaseDataBlock dataTable = transferableBlock.getDataBlock();
+        resultDataBlocks.add(dataTable);
+      }
+    }
+    return resultDataBlocks;
+  }
+
+  public static MailboxReceiveOperator createReduceStageOperator(MailboxService<Mailbox.MailboxContent> mailboxService,
+      List<ServerInstance> sendingInstances, long jobId, int stageId, String hostname, int port) {
+    MailboxReceiveOperator mailboxReceiveOperator =
+        new MailboxReceiveOperator(mailboxService, RelDistribution.Type.ANY, sendingInstances, hostname, port, jobId,
+            stageId);
+    return mailboxReceiveOperator;
   }
 
   public static class DispatchClient {
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java
index f1863bc339..e254778db1 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java
@@ -24,8 +24,8 @@ import com.google.protobuf.ByteString;
 import java.io.IOException;
 import java.util.Map;
 import org.apache.pinot.common.proto.Mailbox;
-import org.apache.pinot.core.common.datatable.DataTableBuilder;
-import org.apache.pinot.query.runtime.blocks.DataTableBlock;
+import org.apache.pinot.query.runtime.blocks.DataBlockUtils;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.util.TestUtils;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -67,6 +67,7 @@ public class GrpcMailboxServiceTest extends GrpcMailboxServiceTestBase {
       throws IOException {
     return Mailbox.MailboxContent.newBuilder().setMailboxId(mailboxId)
         .putAllMetadata(ImmutableMap.of("key", "value", "finished", "true"))
-        .setPayload(ByteString.copyFrom(new DataTableBlock(DataTableBuilder.getEmptyDataTable()).toBytes())).build();
+        .setPayload(ByteString.copyFrom(new TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock()).toBytes()))
+        .build();
   }
 }
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
index 2316195bce..d133584797 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
@@ -27,7 +27,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
-import org.apache.calcite.rel.RelDistribution;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.core.transport.ServerInstance;
@@ -38,8 +37,6 @@ import org.apache.pinot.query.mailbox.GrpcMailboxService;
 import org.apache.pinot.query.planner.QueryPlan;
 import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
 import org.apache.pinot.query.routing.WorkerInstance;
-import org.apache.pinot.query.runtime.blocks.DataTableBlock;
-import org.apache.pinot.query.runtime.blocks.DataTableBlockUtils;
 import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
 import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
 import org.apache.pinot.query.service.QueryConfig;
@@ -84,8 +81,8 @@ public class QueryRunnerTest {
     _mailboxService = new GrpcMailboxService(_reducerHostname, _reducerGrpcPort);
     _mailboxService.start();
 
-    _queryEnvironment =
-        QueryEnvironmentTestUtils.getQueryEnvironment(_reducerGrpcPort, server1.getPort(), server2.getPort());
+    _queryEnvironment = QueryEnvironmentTestUtils.getQueryEnvironment(_reducerGrpcPort, server1.getPort(),
+        server2.getPort());
     server1.start();
     server2.start();
     // this doesn't test the QueryServer functionality so the server port can be the same as the mailbox port.
@@ -111,9 +108,10 @@ public class QueryRunnerTest {
     for (int stageId : queryPlan.getStageMetadataMap().keySet()) {
       if (queryPlan.getQueryStageMap().get(stageId) instanceof MailboxReceiveNode) {
         MailboxReceiveNode reduceNode = (MailboxReceiveNode) queryPlan.getQueryStageMap().get(stageId);
-        mailboxReceiveOperator = createReduceStageOperator(
+        mailboxReceiveOperator = QueryDispatcher.createReduceStageOperator(_mailboxService,
             queryPlan.getStageMetadataMap().get(reduceNode.getSenderStageId()).getServerInstances(),
-            Long.parseLong(requestMetadataMap.get("REQUEST_ID")), reduceNode.getSenderStageId(), _reducerGrpcPort);
+            Long.parseLong(requestMetadataMap.get("REQUEST_ID")), reduceNode.getSenderStageId(), "localhost",
+            _reducerGrpcPort);
       } else {
         for (ServerInstance serverInstance : queryPlan.getStageMetadataMap().get(stageId).getServerInstances()) {
           DistributedStagePlan distributedStagePlan =
@@ -124,10 +122,21 @@ public class QueryRunnerTest {
     }
     Preconditions.checkNotNull(mailboxReceiveOperator);
 
-    List<Object[]> resultRows = reduceMailboxReceive(mailboxReceiveOperator);
+    List<Object[]> resultRows = toRows(QueryDispatcher.reduceMailboxReceive(mailboxReceiveOperator));
     Assert.assertEquals(resultRows.size(), expectedRowCount);
   }
 
+  private static List<Object[]> toRows(List<DataTable> dataTables) {
+    List<Object[]> resultRows = new ArrayList<>();
+    for (DataTable dataTable : dataTables) {
+      int numRows = dataTable.getNumberOfRows();
+      for (int rowId = 0; rowId < numRows; rowId++) {
+        resultRows.add(extractRowFromDataTable(dataTable, rowId));
+      }
+    }
+    return resultRows;
+  }
+
   @DataProvider(name = "testDataWithSqlToFinalRowCount")
   private Object[][] provideTestSqlAndRowCount() {
     return new Object[][] {
@@ -152,31 +161,4 @@ public class QueryRunnerTest {
             + " WHERE a.col3 >= 0 AND a.col2 = 'foo' AND b.col3 >= 0", 3},
     };
   }
-
-  protected static List<Object[]> reduceMailboxReceive(MailboxReceiveOperator mailboxReceiveOperator) {
-    List<Object[]> resultRows = new ArrayList<>();
-    DataTableBlock dataTableBlock;
-    while (true) {
-      dataTableBlock = mailboxReceiveOperator.nextBlock();
-      if (DataTableBlockUtils.isEndOfStream(dataTableBlock)) {
-        break;
-      }
-      if (dataTableBlock.getDataTable() != null) {
-        DataTable dataTable = dataTableBlock.getDataTable();
-        int numRows = dataTable.getNumberOfRows();
-        for (int rowId = 0; rowId < numRows; rowId++) {
-          resultRows.add(extractRowFromDataTable(dataTable, rowId));
-        }
-      }
-    }
-    return resultRows;
-  }
-
-  protected MailboxReceiveOperator createReduceStageOperator(List<ServerInstance> sendingInstances, long jobId,
-      int stageId, int port) {
-    MailboxReceiveOperator mailboxReceiveOperator =
-        new MailboxReceiveOperator(_mailboxService, RelDistribution.Type.ANY, sendingInstances, "localhost", port,
-            jobId, stageId);
-    return mailboxReceiveOperator;
-  }
 }
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/blocks/DataBlockTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/blocks/DataBlockTest.java
new file mode 100644
index 0000000000..bf33160b92
--- /dev/null
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/blocks/DataBlockTest.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.blocks;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.response.ProcessingException;
+import org.apache.pinot.common.utils.DataSchema;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class DataBlockTest {
+  private static final int TEST_ROW_COUNT = 2;
+
+  @Test
+  public void testException()
+      throws IOException {
+    Exception originalException = new UnsupportedOperationException("Expected test exception.");
+    ProcessingException processingException =
+        QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, originalException);
+    String expected = processingException.getMessage();
+
+    BaseDataBlock dataBlock = DataBlockUtils.getErrorDataBlock(originalException);
+    dataBlock.addException(processingException);
+    Assert.assertEquals(dataBlock.getDataSchema().getColumnNames().length, 0);
+    Assert.assertEquals(dataBlock.getDataSchema().getColumnDataTypes().length, 0);
+    Assert.assertEquals(dataBlock.getNumberOfRows(), 0);
+
+    // Assert processing exception and original exception both matches.
+    String actual = dataBlock.getExceptions().get(QueryException.QUERY_EXECUTION_ERROR.getErrorCode());
+    Assert.assertEquals(actual, expected);
+    Assert.assertEquals(dataBlock.getExceptions().get(QueryException.UNKNOWN_ERROR_CODE),
+        originalException.getMessage());
+  }
+
+  @Test
+  public void testAllDataTypes()
+      throws IOException {
+    DataSchema.ColumnDataType[] columnDataTypes = DataSchema.ColumnDataType.values();
+    int numColumns = columnDataTypes.length;
+    String[] columnNames = new String[numColumns];
+    for (int i = 0; i < numColumns; i++) {
+      columnNames[i] = columnDataTypes[i].name();
+    }
+
+    DataSchema dataSchema = new DataSchema(columnNames, columnDataTypes);
+    List<Object[]> rows = DataBlockTestUtils.getRandomRows(dataSchema, TEST_ROW_COUNT);
+    List<Object[]> columnars = DataBlockTestUtils.convertColumnar(dataSchema, rows);
+    RowDataBlock rowBlock = DataBlockBuilder.buildFromRows(rows, dataSchema);
+    ColumnarDataBlock columnarBlock = DataBlockBuilder.buildFromColumns(columnars, dataSchema);
+
+    for (int colId = 0; colId < dataSchema.getColumnNames().length; colId++) {
+      DataSchema.ColumnDataType columnDataType = dataSchema.getColumnDataType(colId);
+      for (int rowId = 0; rowId < TEST_ROW_COUNT; rowId++) {
+        Object rowVal = DataBlockTestUtils.getElement(rowBlock, rowId, colId, columnDataType);
+        Object colVal = DataBlockTestUtils.getElement(columnarBlock, rowId, colId, columnDataType);
+        Assert.assertEquals(rowVal, colVal, "Error comparing Row/Column Block at (" + rowId + "," + colId + ")"
+            + " of Type: " + columnDataType + "! rowValue: [" + rowVal + "], columnarValue: [" + colVal + "]");
+      }
+    }
+  }
+}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/blocks/DataBlockTestUtils.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/blocks/DataBlockTestUtils.java
new file mode 100644
index 0000000000..caa9b55ccf
--- /dev/null
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/blocks/DataBlockTestUtils.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.blocks;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.spi.utils.ByteArray;
+
+
+public class DataBlockTestUtils {
+  private static final Random RANDOM = new Random();
+  private static final int ARRAY_SIZE = 5;
+
+  private DataBlockTestUtils() {
+    // do not instantiate.
+  }
+
+  public static Object[] getRandomRow(DataSchema dataSchema) {
+    final int numColumns = dataSchema.getColumnNames().length;
+    DataSchema.ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
+    Object[] row = new Object[numColumns];
+    for (int colId = 0; colId < numColumns; colId++) {
+      switch (columnDataTypes[colId].getStoredType()) {
+        case INT:
+          row[colId] = RANDOM.nextInt();
+          break;
+        case LONG:
+          row[colId] = RANDOM.nextLong();
+          break;
+        case FLOAT:
+          row[colId] = RANDOM.nextFloat();
+          break;
+        case DOUBLE:
+          row[colId] = RANDOM.nextDouble();
+          break;
+        case BIG_DECIMAL:
+          row[colId] = BigDecimal.valueOf(RANDOM.nextDouble());
+          break;
+        case STRING:
+          row[colId] = RandomStringUtils.random(RANDOM.nextInt(20));
+          break;
+        case BYTES:
+          row[colId] = new ByteArray(RandomStringUtils.random(RANDOM.nextInt(20)).getBytes());
+          break;
+        // Just test Double here, all object types will be covered in ObjectCustomSerDeTest.
+        case OBJECT:
+          row[colId] = RANDOM.nextDouble();
+          break;
+        case BOOLEAN_ARRAY:
+        case INT_ARRAY:
+          int length = RANDOM.nextInt(ARRAY_SIZE);
+          int[] intArray = new int[length];
+          for (int i = 0; i < length; i++) {
+            intArray[i] = RANDOM.nextInt();
+          }
+          row[colId] = intArray;
+          break;
+        case TIMESTAMP_ARRAY:
+        case LONG_ARRAY:
+          length = RANDOM.nextInt(ARRAY_SIZE);
+          long[] longArray = new long[length];
+          for (int i = 0; i < length; i++) {
+            longArray[i] = RANDOM.nextLong();
+          }
+          row[colId] = longArray;
+          break;
+        case FLOAT_ARRAY:
+          length = RANDOM.nextInt(ARRAY_SIZE);
+          float[] floatArray = new float[length];
+          for (int i = 0; i < length; i++) {
+            floatArray[i] = RANDOM.nextFloat();
+          }
+          row[colId] = floatArray;
+          break;
+        case DOUBLE_ARRAY:
+          length = RANDOM.nextInt(ARRAY_SIZE);
+          double[] doubleArray = new double[length];
+          for (int i = 0; i < length; i++) {
+            doubleArray[i] = RANDOM.nextDouble();
+          }
+          row[colId] = doubleArray;
+          break;
+        case BYTES_ARRAY:
+        case STRING_ARRAY:
+          length = RANDOM.nextInt(ARRAY_SIZE);
+          String[] stringArray = new String[length];
+          for (int i = 0; i < length; i++) {
+            stringArray[i] = RandomStringUtils.random(RANDOM.nextInt(20));
+          }
+          row[colId] = stringArray;
+          break;
+        default:
+          throw new UnsupportedOperationException("Can't fill random data for column type: " + columnDataTypes[colId]);
+      }
+    }
+    return row;
+  }
+
+  public static Object getElement(BaseDataBlock dataBlock, int rowId, int colId,
+      DataSchema.ColumnDataType columnDataType) {
+    switch (columnDataType.getStoredType()) {
+      case INT:
+        return dataBlock.getInt(rowId, colId);
+      case LONG:
+        return dataBlock.getLong(rowId, colId);
+      case FLOAT:
+        return dataBlock.getFloat(rowId, colId);
+      case DOUBLE:
+        return dataBlock.getDouble(rowId, colId);
+      case BIG_DECIMAL:
+        return dataBlock.getBigDecimal(rowId, colId);
+      case STRING:
+        return dataBlock.getString(rowId, colId);
+      case BYTES:
+        return dataBlock.getBytes(rowId, colId);
+      case OBJECT:
+        return dataBlock.getObject(rowId, colId);
+      case BOOLEAN_ARRAY:
+      case INT_ARRAY:
+        return dataBlock.getIntArray(rowId, colId);
+      case TIMESTAMP_ARRAY:
+      case LONG_ARRAY:
+        return dataBlock.getLongArray(rowId, colId);
+      case FLOAT_ARRAY:
+        return dataBlock.getFloatArray(rowId, colId);
+      case DOUBLE_ARRAY:
+        return dataBlock.getDoubleArray(rowId, colId);
+      case BYTES_ARRAY:
+      case STRING_ARRAY:
+        return dataBlock.getStringArray(rowId, colId);
+      default:
+        throw new UnsupportedOperationException("Can't retrieve data for column type: " + columnDataType);
+    }
+  }
+
+  public static List<Object[]> getRandomRows(DataSchema dataSchema, int numRows) {
+    List<Object[]> rows = new ArrayList<>(numRows);
+    for (int i = 0; i < numRows; i++) {
+      rows.add(getRandomRow(dataSchema));
+    }
+    return rows;
+  }
+
+  public static List<Object[]> getRandomColumnar(DataSchema dataSchema, int numRows) {
+    List<Object[]> rows = getRandomRows(dataSchema, numRows);
+    return convertColumnar(dataSchema, rows);
+  }
+
+  public static List<Object[]> convertColumnar(DataSchema dataSchema, List<Object[]> rows) {
+    final int numRows = rows.size();
+    final int numColumns = dataSchema.getColumnNames().length;
+    List<Object[]> columnars = new ArrayList<>(numColumns);
+    for (int colId = 0; colId < numColumns; colId++) {
+      columnars.add(new Object[numRows]);
+      for (int rowId = 0; rowId < numRows; rowId++) {
+        columnars.get(colId)[rowId] = rows.get(rowId)[colId];
+      }
+    }
+    return columnars;
+  }
+}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryDispatcherTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryDispatcherTest.java
new file mode 100644
index 0000000000..9057d1a7ec
--- /dev/null
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryDispatcherTest.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.service;
+
+import com.google.common.collect.Lists;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import org.apache.pinot.query.QueryEnvironment;
+import org.apache.pinot.query.QueryEnvironmentTestUtils;
+import org.apache.pinot.query.planner.PlannerUtils;
+import org.apache.pinot.query.planner.QueryPlan;
+import org.apache.pinot.query.runtime.QueryRunner;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+
+public class QueryDispatcherTest {
+  private static final Random RANDOM_REQUEST_ID_GEN = new Random();
+  private static final int QUERY_SERVER_COUNT = 2;
+  private final Map<Integer, QueryServer> _queryServerMap = new HashMap<>();
+  private final Map<Integer, QueryRunner> _queryRunnerMap = new HashMap<>();
+
+  private QueryEnvironment _queryEnvironment;
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+
+    for (int i = 0; i < QUERY_SERVER_COUNT; i++) {
+      int availablePort = QueryEnvironmentTestUtils.getAvailablePort();
+      QueryRunner queryRunner = Mockito.mock(QueryRunner.class);
+      QueryServer queryServer = new QueryServer(availablePort, queryRunner);
+      queryServer.start();
+      _queryServerMap.put(availablePort, queryServer);
+      _queryRunnerMap.put(availablePort, queryRunner);
+    }
+
+    List<Integer> portList = Lists.newArrayList(_queryServerMap.keySet());
+
+    // reducer port doesn't matter, we are testing the worker instance not GRPC.
+    _queryEnvironment = QueryEnvironmentTestUtils.getQueryEnvironment(1, portList.get(0), portList.get(1));
+  }
+
+  @AfterClass
+  public void tearDown() {
+    for (QueryServer worker : _queryServerMap.values()) {
+      worker.shutdown();
+    }
+  }
+
+  @Test(dataProvider = "testDataWithSqlToCompiledAsWorkerRequest")
+  public void testQueryDispatcherCanSendCorrectPayload(String sql)
+      throws Exception {
+    QueryPlan queryPlan = _queryEnvironment.planQuery(sql);
+    QueryDispatcher dispatcher = new QueryDispatcher();
+    int reducerStageId = dispatcher.submit(RANDOM_REQUEST_ID_GEN.nextLong(), queryPlan);
+    Assert.assertTrue(PlannerUtils.isRootStage(reducerStageId));
+  }
+
+  @DataProvider(name = "testDataWithSqlToCompiledAsWorkerRequest")
+  private Object[][] provideTestSqlToCompiledToWorkerRequest() {
+    return new Object[][] {
+        new Object[]{"SELECT * FROM b"},
+        new Object[]{"SELECT * FROM a"},
+        new Object[]{"SELECT * FROM a JOIN b ON a.col3 = b.col3"},
+        new Object[]{"SELECT a.col1, a.ts, c.col2, c.col3 FROM a JOIN c ON a.col1 = c.col2 "
+            + " WHERE (a.col3 >= 0 OR a.col2 = 'foo') AND c.col3 >= 0"},
+    };
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org