You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2022/11/10 19:41:31 UTC

[pinot] branch master updated: [multistage] leaf-node return data in non-serialized format (#9755)

This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 1de904f545 [multistage] leaf-node return data in non-serialized format (#9755)
1de904f545 is described below

commit 1de904f54576540cabd425f02134e4b10597ddf5
Author: Rong Rong <ro...@apache.org>
AuthorDate: Thu Nov 10 11:41:24 2022 -0800

    [multistage] leaf-node return data in non-serialized format (#9755)
    
    This PR directly uses the InstanceResponseBlock to get the non-serialized rows, this
    - avoid the useless Ser/De
    - splitting data into chunks more directly via the List API instead of via the block API
    
    Co-authored-by: Rong Rong <ro...@startree.ai>
---
 .../core/common/datablock/DataBlockBuilder.java    |   5 +-
 .../pinot/core/common/datablock/DataBlockTest.java | 101 ---------------------
 .../core/common/datablock/DataBlockTestUtils.java  |   8 +-
 .../apache/pinot/query/runtime/QueryRunner.java    |  71 +++++++++------
 .../query/runtime/blocks/TransferableBlock.java    |   7 ++
 .../runtime/blocks/TransferableBlockUtils.java     |  80 ++++++++++++----
 .../runtime/operator/MailboxSendOperator.java      |  15 +--
 .../query/runtime/plan/PhysicalPlanVisitor.java    |   2 +-
 .../query/runtime/TransferableBlockUtilsTest.java  |   9 +-
 .../testutils/MockDataBlockOperatorFactory.java    |   4 +-
 10 files changed, 135 insertions(+), 167 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
index 4524a733e8..cf84568fcb 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
@@ -28,7 +28,6 @@ import java.nio.ByteBuffer;
 import java.sql.Timestamp;
 import java.util.List;
 import javax.annotation.Nullable;
-import org.apache.pinot.common.datablock.BaseDataBlock;
 import org.apache.pinot.common.datablock.ColumnarDataBlock;
 import org.apache.pinot.common.datablock.DataBlock;
 import org.apache.pinot.common.datablock.DataBlockUtils;
@@ -45,7 +44,7 @@ import org.roaringbitmap.RoaringBitmap;
 
 public class DataBlockBuilder {
   private final DataSchema _dataSchema;
-  private final BaseDataBlock.Type _blockType;
+  private final DataBlock.Type _blockType;
   private final DataSchema.ColumnDataType[] _columnDataTypes;
 
   private int[] _columnOffsets;
@@ -63,7 +62,7 @@ public class DataBlockBuilder {
   private final DataOutputStream _variableSizeDataOutputStream =
       new DataOutputStream(_variableSizeDataByteArrayOutputStream);
 
-  private DataBlockBuilder(DataSchema dataSchema, BaseDataBlock.Type blockType) {
+  private DataBlockBuilder(DataSchema dataSchema, DataBlock.Type blockType) {
     _dataSchema = dataSchema;
     _columnDataTypes = dataSchema.getColumnDataTypes();
     _blockType = blockType;
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTest.java b/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTest.java
index 72f6cd93ca..64425c84c1 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTest.java
@@ -20,23 +20,15 @@ package org.apache.pinot.core.common.datablock;
 
 import com.google.common.collect.ImmutableList;
 import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.pinot.common.datablock.ColumnarDataBlock;
 import org.apache.pinot.common.datablock.DataBlock;
 import org.apache.pinot.common.datablock.DataBlockUtils;
 import org.apache.pinot.common.datablock.RowDataBlock;
-import org.apache.pinot.common.datatable.DataTable;
-import org.apache.pinot.common.datatable.DataTableFactory;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.response.ProcessingException;
 import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
-import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
-import org.apache.pinot.spi.utils.ArrayCopyUtils;
-import org.roaringbitmap.RoaringBitmap;
 import org.testng.Assert;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
@@ -67,99 +59,6 @@ public class DataBlockTest {
         originalException.getMessage());
   }
 
-  /**
-   * This test is only here to ensure that {@link org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl}
-   * producing data table can actually be wrapped and sent via mailbox in the {@link RowDataBlock} format.
-   *
-   * @throws Exception
-   */
-  @Test(dataProvider = "testTypeNullPercentile")
-  public void testRowDataBlockCompatibleWithDataTableV4(int nullPercentile)
-      throws Exception {
-    DataSchema.ColumnDataType[] allDataTypes = DataSchema.ColumnDataType.values();
-    List<DataSchema.ColumnDataType> columnDataTypes = new ArrayList<DataSchema.ColumnDataType>();
-    List<String> columnNames = new ArrayList<String>();
-    for (int i = 0; i < allDataTypes.length; i++) {
-      if (!EXCLUDE_DATA_TYPES.contains(allDataTypes[i])) {
-        columnNames.add(allDataTypes[i].name());
-        columnDataTypes.add(allDataTypes[i]);
-      }
-    }
-
-    DataSchema dataSchema = new DataSchema(columnNames.toArray(new String[0]),
-        columnDataTypes.toArray(new DataSchema.ColumnDataType[0]));
-    int numColumns = dataSchema.getColumnDataTypes().length;
-    List<Object[]> rows = DataBlockTestUtils.getRandomRows(dataSchema, TEST_ROW_COUNT, nullPercentile);
-    DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_4);
-    convertToDataTableCompatibleRows(rows, dataSchema);
-    DataTable dataTableImpl = SelectionOperatorUtils.getDataTableFromRows(rows, dataSchema, true);
-    DataBlock dataBlockFromDataTable = DataBlockUtils.getDataBlock(ByteBuffer.wrap(dataTableImpl.toBytes()));
-
-    RoaringBitmap[] nullBitmaps = new RoaringBitmap[numColumns];
-    for (int coldId = 0; coldId < numColumns; coldId++) {
-      nullBitmaps[coldId] = dataTableImpl.getNullRowIds(coldId);
-    }
-
-    List<Object[]> rowsFromBlock = DataBlockUtils.extractRows(dataBlockFromDataTable);
-    for (int rowId = 0; rowId < TEST_ROW_COUNT; rowId++) {
-      Object[] rowFromDataTable = SelectionOperatorUtils.extractRowFromDataTableWithNullHandling(dataTableImpl, rowId,
-          nullBitmaps);
-      Object[] rowFromBlock = rowsFromBlock.get(rowId);
-      for (int colId = 0; colId < numColumns; colId++) {
-        Object dataTableObj = rowFromDataTable[colId] == null ? null
-            : dataSchema.getColumnDataType(colId).convert(rowFromDataTable[colId]);
-        Object dataBlockObj = rowFromBlock[colId];
-        Assert.assertEquals(dataBlockObj, dataTableObj, "Error comparing Row/Column Block "
-            + " at (" + rowId + "," + colId + ") of Type: " + dataSchema.getColumnDataType(colId) + "! "
-            + " from DataBlock: [" + dataBlockObj + "], from DataTable: [" + dataTableObj + "]");
-      }
-    }
-
-    for (int colId = 0; colId < dataSchema.getColumnNames().length; colId++) {
-      RoaringBitmap dataBlockBitmap = dataBlockFromDataTable.getNullRowIds(colId);
-      RoaringBitmap dataTableBitmap = dataTableImpl.getNullRowIds(colId);
-      Assert.assertEquals(dataBlockBitmap, dataTableBitmap);
-    }
-  }
-
-  private static void convertToDataTableCompatibleRows(List<Object[]> rows, DataSchema dataSchema) {
-    int numColumns = dataSchema.getColumnNames().length;
-    for (int rowId = 0; rowId < rows.size(); rowId++) {
-      for (int colId = 0; colId < numColumns; colId++) {
-        switch (dataSchema.getColumnDataType(colId)) {
-          case BOOLEAN:
-            if (rows.get(rowId)[colId] != null) {
-              rows.get(rowId)[colId] = ((boolean) rows.get(rowId)[colId]) ? 1 : 0;
-            }
-            break;
-          case TIMESTAMP:
-            if (rows.get(rowId)[colId] != null) {
-              rows.get(rowId)[colId] = ((Timestamp) rows.get(rowId)[colId]).getTime();
-            }
-            break;
-          case BOOLEAN_ARRAY:
-            if (rows.get(rowId)[colId] != null) {
-              boolean[] booleans = (boolean[]) rows.get(rowId)[colId];
-              int[] ints = new int[booleans.length];
-              ArrayCopyUtils.copy(booleans, ints, booleans.length);
-              rows.get(rowId)[colId] = ints;
-            }
-            break;
-          case TIMESTAMP_ARRAY:
-            if (rows.get(rowId)[colId] != null) {
-              Timestamp[] timestamps = (Timestamp[]) rows.get(rowId)[colId];
-              long[] longs = new long[timestamps.length];
-              ArrayCopyUtils.copy(timestamps, longs, timestamps.length);
-              rows.get(rowId)[colId] = longs;
-            }
-            break;
-          default:
-            break;
-        }
-      }
-    }
-  }
-
   @Test(dataProvider = "testTypeNullPercentile")
   public void testAllDataTypes(int nullPercentile)
       throws Exception {
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTestUtils.java b/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTestUtils.java
index 427f727f55..67cc82cba6 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTestUtils.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTestUtils.java
@@ -24,7 +24,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
 import org.apache.commons.lang.RandomStringUtils;
-import org.apache.pinot.common.datablock.BaseDataBlock;
+import org.apache.pinot.common.datablock.DataBlock;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.spi.utils.ByteArray;
 import org.roaringbitmap.RoaringBitmap;
@@ -60,7 +60,7 @@ public class DataBlockTestUtils {
           row[colId] = BigDecimal.valueOf(RANDOM.nextDouble());
           break;
         case BOOLEAN:
-          row[colId] = RANDOM.nextInt(2) % 2 == 1;
+          row[colId] = RANDOM.nextBoolean();
           break;
         case TIMESTAMP:
           row[colId] = new Timestamp(RANDOM.nextLong());
@@ -115,7 +115,7 @@ public class DataBlockTestUtils {
           length = RANDOM.nextInt(ARRAY_SIZE);
           boolean[] booleanArray = new boolean[length];
           for (int i = 0; i < length; i++) {
-            booleanArray[i] = RANDOM.nextInt(2) % 2 == 1;
+            booleanArray[i] = RANDOM.nextBoolean();
           }
           row[colId] = booleanArray;
           break;
@@ -138,7 +138,7 @@ public class DataBlockTestUtils {
     return row;
   }
 
-  public static Object getElement(BaseDataBlock dataBlock, int rowId, int colId,
+  public static Object getElement(DataBlock dataBlock, int rowId, int colId,
       DataSchema.ColumnDataType columnDataType) {
     RoaringBitmap nullBitmap = dataBlock.getNullRowIds(colId);
     if (nullBitmap != null) {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index bf3872e2cb..d0e8a86489 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -19,10 +19,12 @@
 package org.apache.pinot.query.runtime;
 
 import com.google.common.base.Preconditions;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.PriorityQueue;
 import java.util.concurrent.ExecutorService;
 import javax.annotation.Nullable;
 import org.apache.helix.HelixManager;
@@ -32,6 +34,7 @@ import org.apache.pinot.common.datablock.DataBlock;
 import org.apache.pinot.common.datablock.DataBlockUtils;
 import org.apache.pinot.common.datablock.MetadataBlock;
 import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.utils.DataSchema;
@@ -39,6 +42,7 @@ import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
+import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
 import org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl;
 import org.apache.pinot.core.query.request.ServerQueryRequest;
 import org.apache.pinot.core.transport.ServerInstance;
@@ -124,7 +128,7 @@ public class QueryRunner {
           requestMetadataMap, _helixPropertyStore, _mailboxService);
 
       // send the data table via mailbox in one-off fashion (e.g. no block-level split, one data table/partition key)
-      List<DataBlock> serverQueryResults = new ArrayList<>(serverQueryRequests.size());
+      List<InstanceResponseBlock> serverQueryResults = new ArrayList<>(serverQueryRequests.size());
       for (ServerPlanRequestContext requestContext : serverQueryRequests) {
         ServerQueryRequest request = new ServerQueryRequest(requestContext.getInstanceRequest(),
             new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), System.currentTimeMillis());
@@ -138,7 +142,7 @@ public class QueryRunner {
               new LeafStageTransferableBlockOperator(serverQueryResults, sendNode.getDataSchema()),
               receivingStageMetadata.getServerInstances(), sendNode.getExchangeType(),
               sendNode.getPartitionKeySelector(), _hostname, _port, serverQueryRequests.get(0).getRequestId(),
-              sendNode.getStageId());
+              sendNode.getStageId(), true);
       int blockCounter = 0;
       while (!TransferableBlockUtils.isEndOfStream(mailboxSendOperator.nextBlock())) {
         LOGGER.debug("Acquired transferable block: {}", blockCounter++);
@@ -184,22 +188,15 @@ public class QueryRunner {
     return requests;
   }
 
-  private DataBlock processServerQuery(ServerQueryRequest serverQueryRequest, ExecutorService executorService) {
-    DataBlock dataBlock;
+  private InstanceResponseBlock processServerQuery(ServerQueryRequest serverQueryRequest,
+      ExecutorService executorService) {
     try {
-      InstanceResponseBlock instanceResponse = _serverExecutor.execute(serverQueryRequest, executorService);
-      if (!instanceResponse.getExceptions().isEmpty()) {
-        // if contains exception, directly return a metadata block with the exceptions.
-        dataBlock = DataBlockUtils.getErrorDataBlock(instanceResponse.getExceptions());
-      } else {
-        // this works because default DataTableImplV3 will have a version number at beginning:
-        // the new DataBlock encodes lower 16 bits as version and upper 16 bits as type (ROW, COLUMNAR, METADATA)
-        dataBlock = DataBlockUtils.getDataBlock(ByteBuffer.wrap(instanceResponse.toDataTable().toBytes()));
-      }
+      return _serverExecutor.execute(serverQueryRequest, executorService);
     } catch (Exception e) {
-      dataBlock = DataBlockUtils.getErrorDataBlock(e);
+      InstanceResponseBlock errorResponse = new InstanceResponseBlock();
+      errorResponse.getExceptions().put(QueryException.QUERY_EXECUTION_ERROR_CODE, e.getMessage());
+      return errorResponse;
     }
-    return dataBlock;
   }
 
   /**
@@ -216,16 +213,13 @@ public class QueryRunner {
   private static class LeafStageTransferableBlockOperator extends BaseOperator<TransferableBlock> {
     private static final String EXPLAIN_NAME = "LEAF_STAGE_TRANSFER_OPERATOR";
 
-    private final DataBlock _errorBlock;
-    private final List<DataBlock> _baseDataBlocks;
-    private final DataSchema _dataSchema;
-    private boolean _hasTransferred;
+    private final InstanceResponseBlock _errorBlock;
+    private final List<InstanceResponseBlock> _baseResultBlock;
     private int _currentIndex;
 
-    private LeafStageTransferableBlockOperator(List<DataBlock> baseDataBlocks, DataSchema dataSchema) {
-      _baseDataBlocks = baseDataBlocks;
-      _dataSchema = dataSchema;
-      _errorBlock = baseDataBlocks.stream().filter(e -> !e.getExceptions().isEmpty()).findFirst().orElse(null);
+    private LeafStageTransferableBlockOperator(List<InstanceResponseBlock> baseResultBlock, DataSchema dataSchema) {
+      _baseResultBlock = baseResultBlock;
+      _errorBlock = baseResultBlock.stream().filter(e -> !e.getExceptions().isEmpty()).findFirst().orElse(null);
       _currentIndex = 0;
     }
 
@@ -247,10 +241,18 @@ public class QueryRunner {
       }
       if (_errorBlock != null) {
         _currentIndex = -1;
-        return new TransferableBlock(_errorBlock);
+        return new TransferableBlock(DataBlockUtils.getErrorDataBlock(_errorBlock.getExceptions()));
       } else {
-        if (_currentIndex < _baseDataBlocks.size()) {
-          return new TransferableBlock(_baseDataBlocks.get(_currentIndex++));
+        if (_currentIndex < _baseResultBlock.size()) {
+          InstanceResponseBlock responseBlock = _baseResultBlock.get(_currentIndex++);
+          BaseResultsBlock resultsBlock = responseBlock.getResultsBlock();
+          if (resultsBlock != null) {
+            return new TransferableBlock(toList(resultsBlock.getRows(responseBlock.getQueryContext())),
+                responseBlock.getDataSchema(), DataBlock.Type.ROW);
+          } else {
+            return new TransferableBlock(Collections.emptyList(), responseBlock.getDataSchema(),
+                DataBlock.Type.ROW);
+          }
         } else {
           _currentIndex = -1;
           return new TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock());
@@ -259,6 +261,21 @@ public class QueryRunner {
     }
   }
 
+  private static List<Object[]> toList(Collection<Object[]> collections) {
+    if (collections instanceof List) {
+      return (List<Object[]>) collections;
+    } else if (collections instanceof PriorityQueue) {
+      PriorityQueue<Object[]> priorityQueue = (PriorityQueue<Object[]>) collections;
+      List<Object[]> sortedList = new ArrayList<>(priorityQueue.size());
+      while (!priorityQueue.isEmpty()) {
+        sortedList.add(priorityQueue.poll());
+      }
+      return sortedList;
+    } else {
+      throw new UnsupportedOperationException("Unsupported collection type: " + collections.getClass());
+    }
+  }
+
   private boolean isLeafStage(DistributedStagePlan distributedStagePlan) {
     int stageId = distributedStagePlan.getStageId();
     ServerInstance serverInstance = distributedStagePlan.getServerInstance();
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
index 959578cbe9..ec89f0d1c9 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.query.runtime.blocks;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.util.List;
 import org.apache.pinot.common.datablock.ColumnarDataBlock;
 import org.apache.pinot.common.datablock.DataBlock;
@@ -47,6 +48,12 @@ public class TransferableBlock implements Block {
   private List<Object[]> _container;
 
   public TransferableBlock(List<Object[]> container, DataSchema dataSchema, DataBlock.Type containerType) {
+    this(container, dataSchema, containerType, false);
+  }
+
+  @VisibleForTesting
+  TransferableBlock(List<Object[]> container, DataSchema dataSchema, DataBlock.Type containerType,
+      boolean isErrorBlock) {
     _container = container;
     _dataSchema = dataSchema;
     _type = containerType;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlockUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlockUtils.java
index 81eb6b2505..66a8982486 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlockUtils.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlockUtils.java
@@ -23,13 +23,14 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import org.apache.pinot.common.datablock.BaseDataBlock;
 import org.apache.pinot.common.datablock.DataBlock;
 import org.apache.pinot.common.datablock.DataBlockUtils;
-import org.apache.pinot.common.datablock.RowDataBlock;
+import org.apache.pinot.common.utils.DataSchema;
 
 
 public final class TransferableBlockUtils {
+  private static final int MEDIAN_COLUMN_SIZE_BYTES = 8;
+
   private TransferableBlockUtils() {
     // do not instantiate.
   }
@@ -59,37 +60,80 @@ public final class TransferableBlockUtils {
   }
 
   /**
-   *  Split a block into multiple block so that each block size is within maxBlockSize.
-   *  Currently, we only support split for row type dataBlock.
-   *  For columnar data block, we return the original data block.
-   *  Metadata data block split is not supported.
+   * Split block into multiple blocks. Default without any clean up.
    *
-   *  When row size is greater than maxBlockSize, we pack each row as a separate block.
+   * @see TransferableBlockUtils#splitBlock(TransferableBlock, DataBlock.Type, int, boolean)
    */
-  public static List<TransferableBlock> splitBlock(TransferableBlock block, BaseDataBlock.Type type, int maxBlockSize) {
+  public static List<TransferableBlock> splitBlock(TransferableBlock block, DataBlock.Type type, int maxBlockSize) {
+    return splitBlock(block, type, maxBlockSize, false);
+  }
+
+  /**
+   *
+   *  Split a block into multiple block so that each block size is within maxBlockSize. Currently,
+   *  <ul>
+   *    <li>For row data block, we split for row type dataBlock.</li>
+   *    <li>For columnar data block, exceptions are thrown.</li>
+   *    <li>For metadata block, split is not supported.</li>
+   *  </ul>
+   *
+   * @param block the data block
+   * @param type type of block
+   * @param maxBlockSize Each chunk of data is estimated to be less than maxBlockSize
+   * @param needsCanonicalize whether we need to canonicalize the input rows. set to true if the block is constructed
+   *                          from leaf stage.
+   * @return a list of data block chunks
+   */
+  public static List<TransferableBlock> splitBlock(TransferableBlock block, DataBlock.Type type, int maxBlockSize,
+      boolean needsCanonicalize) {
     List<TransferableBlock> blockChunks = new ArrayList<>();
-    if (type != DataBlock.Type.ROW) {
-      return Collections.singletonList(block);
-    } else {
-      int rowSizeInBytes = ((RowDataBlock) block.getDataBlock()).getRowSizeInBytes();
-      int numRowsPerChunk = maxBlockSize / rowSizeInBytes;
+    if (type == DataBlock.Type.ROW) {
+      // Use estimated row size, this estimate is not accurate and is used to estimate numRowsPerChunk only.
+      int estimatedRowSizeInBytes = block.getDataSchema().getColumnNames().length * MEDIAN_COLUMN_SIZE_BYTES;
+      int numRowsPerChunk = maxBlockSize / estimatedRowSizeInBytes;
       Preconditions.checkState(numRowsPerChunk > 0, "row size too large for query engine to handle, abort!");
 
       int totalNumRows = block.getNumRows();
       List<Object[]> allRows = block.getContainer();
+      DataSchema dataSchema = block.getDataSchema();
       int currentRow = 0;
       while (currentRow < totalNumRows) {
         List<Object[]> chunk = allRows.subList(currentRow, Math.min(currentRow + numRowsPerChunk, allRows.size()));
+        if (needsCanonicalize) {
+          canonicalizeRows(chunk, dataSchema);
+        }
         currentRow += numRowsPerChunk;
         blockChunks.add(new TransferableBlock(chunk, block.getDataSchema(), block.getType()));
       }
+      return blockChunks;
+    } else if (type == DataBlock.Type.METADATA) {
+      return Collections.singletonList(block);
+    } else {
+      throw new IllegalArgumentException("Unsupported data block type: " + type);
+    }
+  }
+
+  // In-place canonicalize rows
+  private static void canonicalizeRows(List<Object[]> rows, DataSchema dataSchema) {
+    for (int i = 0; i < rows.size(); i++) {
+      rows.set(i, canonicalizeRow(rows.get(i), dataSchema));
     }
-    return blockChunks;
   }
 
-  public static Object[] getRow(TransferableBlock transferableBlock, int rowId) {
-    Preconditions.checkState(transferableBlock.getType() == DataBlock.Type.ROW,
-        "TransferableBlockUtils doesn't support get row from non-ROW-based data block type yet!");
-    return transferableBlock.getContainer().get(rowId);
+  /**
+   * This util is used to canonicalize row generated from V1 engine, which is stored using
+   * {@link DataSchema#getStoredColumnDataTypes()} format. However, the transferable block ser/de stores data in the
+   * {@link DataSchema#getColumnDataTypes()} format.
+   *
+   * @param row un-canonicalize row.
+   * @param dataSchema data schema desired for the row.
+   * @return canonicalize row.
+   */
+  private static Object[] canonicalizeRow(Object[] row, DataSchema dataSchema) {
+    Object[] resultRow = new Object[row.length];
+    for (int colId = 0; colId < row.length; colId++) {
+      resultRow[colId] = dataSchema.getColumnDataType(colId).convert(row[colId]);
+    }
+    return resultRow;
   }
 }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index a26fa19626..71872ebefd 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -28,7 +28,7 @@ import java.util.Random;
 import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.calcite.rel.RelDistribution;
-import org.apache.pinot.common.datablock.BaseDataBlock;
+import org.apache.pinot.common.datablock.DataBlock;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.operator.BaseOperator;
@@ -67,12 +67,13 @@ public class MailboxSendOperator extends BaseOperator<TransferableBlock> {
   private final int _stageId;
   private final MailboxService<TransferableBlock> _mailboxService;
   private final DataSchema _dataSchema;
+  private final boolean _isLeafStageSender;
   private Operator<TransferableBlock> _dataTableBlockBaseOperator;
 
   public MailboxSendOperator(MailboxService<TransferableBlock> mailboxService, DataSchema dataSchema,
       Operator<TransferableBlock> dataTableBlockBaseOperator, List<ServerInstance> receivingStageInstances,
       RelDistribution.Type exchangeType, KeySelector<Object[], Object[]> keySelector, String hostName, int port,
-      long jobId, int stageId) {
+      long jobId, int stageId, boolean isLeafStageSender) {
     _dataSchema = dataSchema;
     _mailboxService = mailboxService;
     _dataTableBlockBaseOperator = dataTableBlockBaseOperator;
@@ -98,6 +99,7 @@ public class MailboxSendOperator extends BaseOperator<TransferableBlock> {
     _stageId = stageId;
     Preconditions.checkState(SUPPORTED_EXCHANGE_TYPE.contains(_exchangeType),
         String.format("Exchange type '%s' is not supported yet", _exchangeType));
+    _isLeafStageSender = isLeafStageSender;
   }
 
   @Override
@@ -129,7 +131,7 @@ public class MailboxSendOperator extends BaseOperator<TransferableBlock> {
     }
 
     boolean isEndOfStream = TransferableBlockUtils.isEndOfStream(transferableBlock);
-    BaseDataBlock.Type type = transferableBlock.getType();
+    DataBlock.Type type = transferableBlock.getType();
     try {
       switch (_exchangeType) {
         case SINGLETON:
@@ -182,8 +184,7 @@ public class MailboxSendOperator extends BaseOperator<TransferableBlock> {
       for (int i = 0; i < partitionSize; i++) {
         temporaryRows.add(new ArrayList<>());
       }
-      for (int rowId = 0; rowId < transferableBlock.getNumRows(); rowId++) {
-        Object[] row = TransferableBlockUtils.getRow(transferableBlock, rowId);
+      for (Object[] row : transferableBlock.getContainer()) {
         int partitionId = keySelector.computeHash(row) % partitionSize;
         temporaryRows.get(partitionId).add(row);
       }
@@ -197,7 +198,7 @@ public class MailboxSendOperator extends BaseOperator<TransferableBlock> {
   }
 
   private void sendDataTableBlockToServers(List<ServerInstance> servers, TransferableBlock transferableBlock,
-      BaseDataBlock.Type type, boolean isEndOfStream) {
+      DataBlock.Type type, boolean isEndOfStream) {
     if (isEndOfStream) {
       for (ServerInstance server : servers) {
         sendDataTableBlock(server, transferableBlock, true);
@@ -205,7 +206,7 @@ public class MailboxSendOperator extends BaseOperator<TransferableBlock> {
     } else {
       // Split the block only when it is not end of stream block.
       List<TransferableBlock> chunks = TransferableBlockUtils.splitBlock(transferableBlock, type,
-          MAX_MAILBOX_CONTENT_SIZE_BYTES);
+          MAX_MAILBOX_CONTENT_SIZE_BYTES, _isLeafStageSender);
       for (ServerInstance server : servers) {
         for (TransferableBlock chunk : chunks) {
           sendDataTableBlock(server, chunk, false);
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
index 6ba6d6b4c1..57000e1267 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
@@ -76,7 +76,7 @@ public class PhysicalPlanVisitor implements StageNodeVisitor<Operator<Transferab
     StageMetadata receivingStageMetadata = context.getMetadataMap().get(node.getReceiverStageId());
     return new MailboxSendOperator(context.getMailboxService(), node.getDataSchema(), nextOperator,
         receivingStageMetadata.getServerInstances(), node.getExchangeType(), node.getPartitionKeySelector(),
-        context.getHostName(), context.getPort(), context.getRequestId(), node.getStageId());
+        context.getHostName(), context.getPort(), context.getRequestId(), node.getStageId(), false);
   }
 
   @Override
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/TransferableBlockUtilsTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/TransferableBlockUtilsTest.java
index 1d30274588..9d64592e94 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/TransferableBlockUtilsTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/TransferableBlockUtilsTest.java
@@ -38,6 +38,7 @@ import org.testng.annotations.Test;
 
 public class TransferableBlockUtilsTest {
   private static final int TOTAL_ROW_COUNT = 50;
+  private static final int TEST_EST_BYTES_PER_COLUMN = 8;
   private static final List<DataSchema.ColumnDataType> EXCLUDE_DATA_TYPES = ImmutableList.of(
       DataSchema.ColumnDataType.OBJECT, DataSchema.ColumnDataType.JSON, DataSchema.ColumnDataType.BYTES,
       DataSchema.ColumnDataType.BYTES_ARRAY);
@@ -67,15 +68,15 @@ public class TransferableBlockUtilsTest {
   public void testSplitBlockUtils(int splitRowCount)
       throws Exception {
     DataSchema dataSchema = getDataSchema();
-    List<Object[]> rows = DataBlockTestUtils.getRandomRows(dataSchema, TOTAL_ROW_COUNT, 1);
     // compare serialized split
+    int estRowSizeInBytes = dataSchema.size() * TEST_EST_BYTES_PER_COLUMN;
+    List<Object[]> rows = DataBlockTestUtils.getRandomRows(dataSchema, TOTAL_ROW_COUNT, 1);
     RowDataBlock rowBlock = DataBlockBuilder.buildFromRows(rows, dataSchema);
-    int rowSizeInBytes = rowBlock.getRowSizeInBytes();
     validateBlocks(TransferableBlockUtils.splitBlock(new TransferableBlock(rowBlock),
-        DataBlock.Type.ROW, rowSizeInBytes * splitRowCount + 1), rows, dataSchema);
+        DataBlock.Type.ROW, estRowSizeInBytes * splitRowCount + 1), rows, dataSchema);
     // compare non-serialized split
     validateBlocks(TransferableBlockUtils.splitBlock(new TransferableBlock(rows, dataSchema, DataBlock.Type.ROW),
-        DataBlock.Type.ROW, rowSizeInBytes * splitRowCount + 1), rows, dataSchema);
+        DataBlock.Type.ROW, estRowSizeInBytes * splitRowCount + 1), rows, dataSchema);
   }
 
   @Test
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockDataBlockOperatorFactory.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockDataBlockOperatorFactory.java
index a85ae59e6d..0140576981 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockDataBlockOperatorFactory.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockDataBlockOperatorFactory.java
@@ -22,7 +22,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import org.apache.pinot.common.datablock.BaseDataBlock;
+import org.apache.pinot.common.datablock.DataBlock;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
@@ -62,7 +62,7 @@ public class MockDataBlockOperatorFactory {
         return _invocationCount >= _rowsMap.get(operatorName).size()
             ? TransferableBlockUtils.getEndOfStreamTransferableBlock()
             : new TransferableBlock(_rowsMap.get(operatorName).get(_invocationCount++),
-                _operatorSchemaMap.get(operatorName), BaseDataBlock.Type.ROW);
+                _operatorSchemaMap.get(operatorName), DataBlock.Type.ROW);
       }
     });
     return operator;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org