You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/11/08 03:35:33 UTC

[GitHub] [pinot] walterddr opened a new pull request, #9755: [multistage] leaf-node return data in non-serialized format

walterddr opened a new pull request, #9755:
URL: https://github.com/apache/pinot/pull/9755

   currently, leaf-node now returns a data table, this data table gets deserialized into rows. split into chunks and then reserialized back into smaller chunks for RPC sending.
   
   This PR directly use the `InstanceResponseBlock` to get the non-serialized rows, this avoid 2 additional Ser/De


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9755: [multistage] leaf-node return data in non-serialized format

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9755:
URL: https://github.com/apache/pinot/pull/9755#discussion_r1019473008


##########
pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java:
##########
@@ -110,8 +109,8 @@ public static RowDataBlock buildFromRows(List<Object[]> rows, DataSchema dataSch
       nullPlaceholders[colId] = columnDataTypes[colId].convert(storedColumnDataTypes[colId].getNullPlaceholder());
     }
     rowBuilder._numRows = rows.size();
-    for (int rowId = 0; rowId < rows.size(); rowId++) {
-      Object[] row = rows.get(rowId);
+    int rowId = 0;
+    for (Object[] row : rows) {

Review Comment:
   missed place when revert from collection<object[]> back to list<object[]>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] siddharthteotia commented on a diff in pull request #9755: [multistage] leaf-node return data in non-serialized format

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on code in PR #9755:
URL: https://github.com/apache/pinot/pull/9755#discussion_r1018303289


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlockUtils.java:
##########
@@ -59,37 +60,67 @@ public static boolean isNoOpBlock(TransferableBlock transferableBlock) {
   }
 
   /**
-   *  Split a block into multiple block so that each block size is within maxBlockSize.
-   *  Currently, we only support split for row type dataBlock.
-   *  For columnar data block, we return the original data block.
-   *  Metadata data block split is not supported.
+   * Split block into multiple blocks. Default without any clean up.
    *
-   *  When row size is greater than maxBlockSize, we pack each row as a separate block.
+   * @see TransferableBlockUtils#splitBlock(TransferableBlock, BaseDataBlock.Type, int, boolean)
    */
   public static List<TransferableBlock> splitBlock(TransferableBlock block, BaseDataBlock.Type type, int maxBlockSize) {
+    return splitBlock(block, type, maxBlockSize, false);
+  }
+
+  /**
+   *
+   *  Split a block into multiple block so that each block size is within maxBlockSize. Currently,
+   *  <ul>
+   *    <li>For row data block, we split for row type dataBlock.</li>
+   *    <li>For columnar data block, we return the original data block.</li>
+   *    <li>For metadata block, split is not supported.</li>
+   *  </ul>
+   *
+   * @param block the data block
+   * @param type type of block
+   * @param maxBlockSize Each chunk of data is less than maxBlockSize
+   * @param isCleanupRequired whether clean up is required, set to true if the block is constructed from leaf stage.
+   * @return a list of data block chunks
+   */
+  public static List<TransferableBlock> splitBlock(TransferableBlock block, BaseDataBlock.Type type, int maxBlockSize,
+      boolean isCleanupRequired) {
     List<TransferableBlock> blockChunks = new ArrayList<>();
     if (type != DataBlock.Type.ROW) {
       return Collections.singletonList(block);
     } else {
-      int rowSizeInBytes = ((RowDataBlock) block.getDataBlock()).getRowSizeInBytes();
-      int numRowsPerChunk = maxBlockSize / rowSizeInBytes;
+      int estimatedRowSizeInBytes = block.getDataSchema().getColumnNames().length * 8;

Review Comment:
   (nit) can you instead declare a static final constant for 8 ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9755: [multistage] leaf-node return data in non-serialized format

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9755:
URL: https://github.com/apache/pinot/pull/9755#discussion_r1018462041


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlockUtils.java:
##########
@@ -59,37 +60,67 @@ public static boolean isNoOpBlock(TransferableBlock transferableBlock) {
   }
 
   /**
-   *  Split a block into multiple block so that each block size is within maxBlockSize.
-   *  Currently, we only support split for row type dataBlock.
-   *  For columnar data block, we return the original data block.
-   *  Metadata data block split is not supported.
+   * Split block into multiple blocks. Default without any clean up.
    *
-   *  When row size is greater than maxBlockSize, we pack each row as a separate block.
+   * @see TransferableBlockUtils#splitBlock(TransferableBlock, BaseDataBlock.Type, int, boolean)
    */
   public static List<TransferableBlock> splitBlock(TransferableBlock block, BaseDataBlock.Type type, int maxBlockSize) {
+    return splitBlock(block, type, maxBlockSize, false);
+  }
+
+  /**
+   *
+   *  Split a block into multiple block so that each block size is within maxBlockSize. Currently,
+   *  <ul>
+   *    <li>For row data block, we split for row type dataBlock.</li>
+   *    <li>For columnar data block, we return the original data block.</li>
+   *    <li>For metadata block, split is not supported.</li>
+   *  </ul>
+   *
+   * @param block the data block
+   * @param type type of block
+   * @param maxBlockSize Each chunk of data is less than maxBlockSize
+   * @param isCleanupRequired whether clean up is required, set to true if the block is constructed from leaf stage.
+   * @return a list of data block chunks
+   */
+  public static List<TransferableBlock> splitBlock(TransferableBlock block, BaseDataBlock.Type type, int maxBlockSize,
+      boolean isCleanupRequired) {
     List<TransferableBlock> blockChunks = new ArrayList<>();
     if (type != DataBlock.Type.ROW) {
       return Collections.singletonList(block);
     } else {
-      int rowSizeInBytes = ((RowDataBlock) block.getDataBlock()).getRowSizeInBytes();
-      int numRowsPerChunk = maxBlockSize / rowSizeInBytes;
+      int estimatedRowSizeInBytes = block.getDataSchema().getColumnNames().length * 8;

Review Comment:
   yeah we dont have to be precise. (for example if there's variable length fields, then it will never be precise)
   unless we check exactly the bytes written into the buffers,  which seems overly complicated
   
   but will refactor our the constant



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] agavra commented on a diff in pull request #9755: [multistage] leaf-node return data in non-serialized format

Posted by GitBox <gi...@apache.org>.
agavra commented on code in PR #9755:
URL: https://github.com/apache/pinot/pull/9755#discussion_r1019451408


##########
pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java:
##########
@@ -110,8 +109,8 @@ public static RowDataBlock buildFromRows(List<Object[]> rows, DataSchema dataSch
       nullPlaceholders[colId] = columnDataTypes[colId].convert(storedColumnDataTypes[colId].getNullPlaceholder());
     }
     rowBuilder._numRows = rows.size();
-    for (int rowId = 0; rowId < rows.size(); rowId++) {
-      Object[] row = rows.get(rowId);
+    int rowId = 0;
+    for (Object[] row : rows) {

Review Comment:
   just curious, why change this from an indexed for loop if we're just going to do that anyway?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on pull request #9755: [multistage] leaf-node return data in non-serialized format

Posted by GitBox <gi...@apache.org>.
walterddr commented on PR #9755:
URL: https://github.com/apache/pinot/pull/9755#issuecomment-1309003496

   FYI @ankitsultana @agavra @siddharthteotia 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] siddharthteotia commented on a diff in pull request #9755: [multistage] leaf-node return data in non-serialized format

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on code in PR #9755:
URL: https://github.com/apache/pinot/pull/9755#discussion_r1018481538


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlockUtils.java:
##########
@@ -59,37 +60,67 @@ public static boolean isNoOpBlock(TransferableBlock transferableBlock) {
   }
 
   /**
-   *  Split a block into multiple block so that each block size is within maxBlockSize.
-   *  Currently, we only support split for row type dataBlock.
-   *  For columnar data block, we return the original data block.
-   *  Metadata data block split is not supported.
+   * Split block into multiple blocks. Default without any clean up.
    *
-   *  When row size is greater than maxBlockSize, we pack each row as a separate block.
+   * @see TransferableBlockUtils#splitBlock(TransferableBlock, BaseDataBlock.Type, int, boolean)
    */
   public static List<TransferableBlock> splitBlock(TransferableBlock block, BaseDataBlock.Type type, int maxBlockSize) {
+    return splitBlock(block, type, maxBlockSize, false);
+  }
+
+  /**
+   *
+   *  Split a block into multiple block so that each block size is within maxBlockSize. Currently,
+   *  <ul>
+   *    <li>For row data block, we split for row type dataBlock.</li>
+   *    <li>For columnar data block, we return the original data block.</li>
+   *    <li>For metadata block, split is not supported.</li>
+   *  </ul>
+   *
+   * @param block the data block
+   * @param type type of block
+   * @param maxBlockSize Each chunk of data is less than maxBlockSize
+   * @param isCleanupRequired whether clean up is required, set to true if the block is constructed from leaf stage.
+   * @return a list of data block chunks
+   */
+  public static List<TransferableBlock> splitBlock(TransferableBlock block, BaseDataBlock.Type type, int maxBlockSize,
+      boolean isCleanupRequired) {
     List<TransferableBlock> blockChunks = new ArrayList<>();
     if (type != DataBlock.Type.ROW) {
       return Collections.singletonList(block);
     } else {
-      int rowSizeInBytes = ((RowDataBlock) block.getDataBlock()).getRowSizeInBytes();
-      int numRowsPerChunk = maxBlockSize / rowSizeInBytes;
+      int estimatedRowSizeInBytes = block.getDataSchema().getColumnNames().length * 8;

Review Comment:
   > unless we check exactly the bytes written into the buffers, which seems overly complicated
   
   Yes this is what we do in one of the places (unrelated to data table serialization code). I am ok with `estimatedSize` as long as it does not impact correctness.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] siddharthteotia commented on a diff in pull request #9755: [multistage] leaf-node return data in non-serialized format

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on code in PR #9755:
URL: https://github.com/apache/pinot/pull/9755#discussion_r1018481916


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java:
##########
@@ -67,12 +67,13 @@ public class MailboxSendOperator extends BaseOperator<TransferableBlock> {
   private final int _stageId;
   private final MailboxService<TransferableBlock> _mailboxService;
   private final DataSchema _dataSchema;
+  private final boolean _isLeafStageSender;
   private Operator<TransferableBlock> _dataTableBlockBaseOperator;
 
   public MailboxSendOperator(MailboxService<TransferableBlock> mailboxService, DataSchema dataSchema,
       Operator<TransferableBlock> dataTableBlockBaseOperator, List<ServerInstance> receivingStageInstances,
       RelDistribution.Type exchangeType, KeySelector<Object[], Object[]> keySelector, String hostName, int port,
-      long jobId, int stageId) {
+      long jobId, int stageId, boolean isLeafStageSender) {

Review Comment:
   yes the motivation for doing this for leaf layer alone is pretty clear and makes sense



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9755: [multistage] leaf-node return data in non-serialized format

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9755:
URL: https://github.com/apache/pinot/pull/9755#discussion_r1019242021


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java:
##########
@@ -76,7 +83,7 @@ public DataSchema getDataSchema() {
    *
    * @return data container.
    */
-  public List<Object[]> getContainer() {

Review Comment:
   @agavra this involved changing the contain API from List to Collection. to support PriorityQueue pre-sorted intermediate result blocks. 



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java:
##########
@@ -76,7 +83,7 @@ public DataSchema getDataSchema() {
    *
    * @return data container.
    */
-  public List<Object[]> getContainer() {

Review Comment:
   @agavra this involved changing the container API from List to Collection. to support PriorityQueue pre-sorted intermediate result blocks. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] codecov-commenter commented on pull request #9755: [multistage] leaf-node return data in non-serialized format

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #9755:
URL: https://github.com/apache/pinot/pull/9755#issuecomment-1307688019

   # [Codecov](https://codecov.io/gh/apache/pinot/pull/9755?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#9755](https://codecov.io/gh/apache/pinot/pull/9755?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (2497a82) into [master](https://codecov.io/gh/apache/pinot/commit/c9a6e5207a6cedf325f4abaeac2970077ab5685d?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (c9a6e52) will **increase** coverage by `0.01%`.
   > The diff coverage is `87.50%`.
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #9755      +/-   ##
   ============================================
   + Coverage     68.47%   68.49%   +0.01%     
   + Complexity     5369     4891     -478     
   ============================================
     Files          1953     1954       +1     
     Lines        104797   104839      +42     
     Branches      15871    15875       +4     
   ============================================
   + Hits          71764    71813      +49     
   + Misses        27914    27895      -19     
   - Partials       5119     5131      +12     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | integration1 | `25.28% <7.14%> (-0.10%)` | :arrow_down: |
   | unittests1 | `67.57% <87.50%> (+0.01%)` | :arrow_up: |
   | unittests2 | `15.65% <75.00%> (+0.01%)` | :arrow_up: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/pinot/pull/9755?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...va/org/apache/pinot/query/runtime/QueryRunner.java](https://codecov.io/gh/apache/pinot/pull/9755/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9RdWVyeVJ1bm5lci5qYXZh) | `80.00% <58.82%> (-1.56%)` | :arrow_down: |
   | [.../pinot/core/common/datablock/DataBlockBuilder.java](https://codecov.io/gh/apache/pinot/pull/9755/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9jb21tb24vZGF0YWJsb2NrL0RhdGFCbG9ja0J1aWxkZXIuamF2YQ==) | `73.27% <100.00%> (+0.29%)` | :arrow_up: |
   | [.../pinot/query/runtime/blocks/TransferableBlock.java](https://codecov.io/gh/apache/pinot/pull/9755/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9ibG9ja3MvVHJhbnNmZXJhYmxlQmxvY2suamF2YQ==) | `70.21% <100.00%> (-0.90%)` | :arrow_down: |
   | [...t/query/runtime/blocks/TransferableBlockUtils.java](https://codecov.io/gh/apache/pinot/pull/9755/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9ibG9ja3MvVHJhbnNmZXJhYmxlQmxvY2tVdGlscy5qYXZh) | `96.96% <100.00%> (+5.66%)` | :arrow_up: |
   | [...e/pinot/query/runtime/operator/FilterOperator.java](https://codecov.io/gh/apache/pinot/pull/9755/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9vcGVyYXRvci9GaWx0ZXJPcGVyYXRvci5qYXZh) | `60.00% <100.00%> (ø)` | |
   | [...pinot/query/runtime/operator/HashJoinOperator.java](https://codecov.io/gh/apache/pinot/pull/9755/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9vcGVyYXRvci9IYXNoSm9pbk9wZXJhdG9yLmphdmE=) | `81.69% <100.00%> (ø)` | |
   | [...ot/query/runtime/operator/MailboxSendOperator.java](https://codecov.io/gh/apache/pinot/pull/9755/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9vcGVyYXRvci9NYWlsYm94U2VuZE9wZXJhdG9yLmphdmE=) | `88.54% <100.00%> (+0.12%)` | :arrow_up: |
   | [...inot/query/runtime/operator/TransformOperator.java](https://codecov.io/gh/apache/pinot/pull/9755/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9vcGVyYXRvci9UcmFuc2Zvcm1PcGVyYXRvci5qYXZh) | `79.31% <100.00%> (ø)` | |
   | [.../pinot/query/runtime/plan/PhysicalPlanVisitor.java](https://codecov.io/gh/apache/pinot/pull/9755/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9wbGFuL1BoeXNpY2FsUGxhblZpc2l0b3IuamF2YQ==) | `96.77% <100.00%> (ø)` | |
   | [...nction/DistinctCountBitmapAggregationFunction.java](https://codecov.io/gh/apache/pinot/pull/9755/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9xdWVyeS9hZ2dyZWdhdGlvbi9mdW5jdGlvbi9EaXN0aW5jdENvdW50Qml0bWFwQWdncmVnYXRpb25GdW5jdGlvbi5qYXZh) | `47.66% <0.00%> (-13.99%)` | :arrow_down: |
   | ... and [48 more](https://codecov.io/gh/apache/pinot/pull/9755/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] siddharthteotia commented on a diff in pull request #9755: [multistage] leaf-node return data in non-serialized format

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on code in PR #9755:
URL: https://github.com/apache/pinot/pull/9755#discussion_r1018314119


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlockUtils.java:
##########
@@ -59,37 +60,67 @@ public static boolean isNoOpBlock(TransferableBlock transferableBlock) {
   }
 
   /**
-   *  Split a block into multiple block so that each block size is within maxBlockSize.
-   *  Currently, we only support split for row type dataBlock.
-   *  For columnar data block, we return the original data block.
-   *  Metadata data block split is not supported.
+   * Split block into multiple blocks. Default without any clean up.
    *
-   *  When row size is greater than maxBlockSize, we pack each row as a separate block.
+   * @see TransferableBlockUtils#splitBlock(TransferableBlock, BaseDataBlock.Type, int, boolean)
    */
   public static List<TransferableBlock> splitBlock(TransferableBlock block, BaseDataBlock.Type type, int maxBlockSize) {
+    return splitBlock(block, type, maxBlockSize, false);
+  }
+
+  /**
+   *
+   *  Split a block into multiple block so that each block size is within maxBlockSize. Currently,
+   *  <ul>
+   *    <li>For row data block, we split for row type dataBlock.</li>
+   *    <li>For columnar data block, we return the original data block.</li>
+   *    <li>For metadata block, split is not supported.</li>
+   *  </ul>
+   *
+   * @param block the data block
+   * @param type type of block
+   * @param maxBlockSize Each chunk of data is less than maxBlockSize
+   * @param isCleanupRequired whether clean up is required, set to true if the block is constructed from leaf stage.
+   * @return a list of data block chunks
+   */
+  public static List<TransferableBlock> splitBlock(TransferableBlock block, BaseDataBlock.Type type, int maxBlockSize,
+      boolean isCleanupRequired) {
     List<TransferableBlock> blockChunks = new ArrayList<>();
     if (type != DataBlock.Type.ROW) {
       return Collections.singletonList(block);
     } else {
-      int rowSizeInBytes = ((RowDataBlock) block.getDataBlock()).getRowSizeInBytes();
-      int numRowsPerChunk = maxBlockSize / rowSizeInBytes;
+      int estimatedRowSizeInBytes = block.getDataSchema().getColumnNames().length * 8;
+      int numRowsPerChunk = maxBlockSize / estimatedRowSizeInBytes;
       Preconditions.checkState(numRowsPerChunk > 0, "row size too large for query engine to handle, abort!");
 
-      int totalNumRows = block.getNumRows();
-      List<Object[]> allRows = block.getContainer();
-      int currentRow = 0;
-      while (currentRow < totalNumRows) {
-        List<Object[]> chunk = allRows.subList(currentRow, Math.min(currentRow + numRowsPerChunk, allRows.size()));
-        currentRow += numRowsPerChunk;
+      Collection<Object[]> allRows = block.getContainer();
+      DataSchema dataSchema = block.getDataSchema();
+      int rowId = 0;
+      List<Object[]> chunk = new ArrayList<>(numRowsPerChunk);
+      for (Object[] row : allRows) {
+        if (isCleanupRequired) {
+          chunk.add(cleanupRow(row, dataSchema));
+        } else {
+          chunk.add(row);
+        }
+        rowId++;
+        if (rowId % numRowsPerChunk == 0) {
+          blockChunks.add(new TransferableBlock(chunk, block.getDataSchema(), block.getType()));
+          chunk = new ArrayList<>();
+        }
+      }
+      if (chunk.size() > 0) {
         blockChunks.add(new TransferableBlock(chunk, block.getDataSchema(), block.getType()));
       }
     }
     return blockChunks;
   }
 
-  public static Object[] getRow(TransferableBlock transferableBlock, int rowId) {
-    Preconditions.checkState(transferableBlock.getType() == DataBlock.Type.ROW,
-        "TransferableBlockUtils doesn't support get row from non-ROW-based data block type yet!");
-    return transferableBlock.getContainer().get(rowId);
+  private static Object[] cleanupRow(Object[] row, DataSchema dataSchema) {

Review Comment:
   (nit) shouldn't be called cleanup imo but I don't have a better suggestion :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] siddharthteotia commented on a diff in pull request #9755: [multistage] leaf-node return data in non-serialized format

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on code in PR #9755:
URL: https://github.com/apache/pinot/pull/9755#discussion_r1018480017


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlockUtils.java:
##########
@@ -59,37 +60,67 @@ public static boolean isNoOpBlock(TransferableBlock transferableBlock) {
   }
 
   /**
-   *  Split a block into multiple block so that each block size is within maxBlockSize.
-   *  Currently, we only support split for row type dataBlock.
-   *  For columnar data block, we return the original data block.
-   *  Metadata data block split is not supported.
+   * Split block into multiple blocks. Default without any clean up.
    *
-   *  When row size is greater than maxBlockSize, we pack each row as a separate block.
+   * @see TransferableBlockUtils#splitBlock(TransferableBlock, BaseDataBlock.Type, int, boolean)
    */
   public static List<TransferableBlock> splitBlock(TransferableBlock block, BaseDataBlock.Type type, int maxBlockSize) {
+    return splitBlock(block, type, maxBlockSize, false);
+  }
+
+  /**
+   *
+   *  Split a block into multiple block so that each block size is within maxBlockSize. Currently,
+   *  <ul>
+   *    <li>For row data block, we split for row type dataBlock.</li>
+   *    <li>For columnar data block, we return the original data block.</li>
+   *    <li>For metadata block, split is not supported.</li>
+   *  </ul>
+   *
+   * @param block the data block
+   * @param type type of block
+   * @param maxBlockSize Each chunk of data is less than maxBlockSize
+   * @param isCleanupRequired whether clean up is required, set to true if the block is constructed from leaf stage.
+   * @return a list of data block chunks
+   */
+  public static List<TransferableBlock> splitBlock(TransferableBlock block, BaseDataBlock.Type type, int maxBlockSize,
+      boolean isCleanupRequired) {
     List<TransferableBlock> blockChunks = new ArrayList<>();
     if (type != DataBlock.Type.ROW) {
       return Collections.singletonList(block);
     } else {

Review Comment:
   Yes I think COLUMNAR format support for transferable block is a TODO. We can throw for now but should call out the TODO I guess ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] siddharthteotia commented on a diff in pull request #9755: [multistage] leaf-node return data in non-serialized format

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on code in PR #9755:
URL: https://github.com/apache/pinot/pull/9755#discussion_r1018313618


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlockUtils.java:
##########
@@ -59,37 +60,67 @@ public static boolean isNoOpBlock(TransferableBlock transferableBlock) {
   }
 
   /**
-   *  Split a block into multiple block so that each block size is within maxBlockSize.
-   *  Currently, we only support split for row type dataBlock.
-   *  For columnar data block, we return the original data block.
-   *  Metadata data block split is not supported.
+   * Split block into multiple blocks. Default without any clean up.
    *
-   *  When row size is greater than maxBlockSize, we pack each row as a separate block.
+   * @see TransferableBlockUtils#splitBlock(TransferableBlock, BaseDataBlock.Type, int, boolean)
    */
   public static List<TransferableBlock> splitBlock(TransferableBlock block, BaseDataBlock.Type type, int maxBlockSize) {
+    return splitBlock(block, type, maxBlockSize, false);
+  }
+
+  /**
+   *
+   *  Split a block into multiple block so that each block size is within maxBlockSize. Currently,
+   *  <ul>
+   *    <li>For row data block, we split for row type dataBlock.</li>
+   *    <li>For columnar data block, we return the original data block.</li>
+   *    <li>For metadata block, split is not supported.</li>
+   *  </ul>
+   *
+   * @param block the data block
+   * @param type type of block
+   * @param maxBlockSize Each chunk of data is less than maxBlockSize
+   * @param isCleanupRequired whether clean up is required, set to true if the block is constructed from leaf stage.
+   * @return a list of data block chunks
+   */
+  public static List<TransferableBlock> splitBlock(TransferableBlock block, BaseDataBlock.Type type, int maxBlockSize,
+      boolean isCleanupRequired) {
     List<TransferableBlock> blockChunks = new ArrayList<>();
     if (type != DataBlock.Type.ROW) {
       return Collections.singletonList(block);
     } else {
-      int rowSizeInBytes = ((RowDataBlock) block.getDataBlock()).getRowSizeInBytes();
-      int numRowsPerChunk = maxBlockSize / rowSizeInBytes;
+      int estimatedRowSizeInBytes = block.getDataSchema().getColumnNames().length * 8;

Review Comment:
   Instead of doing it this way, can we not keep adding rows in a chunk until the size exceeds the maxBlockSize threshold and then create chunk / split block boundary. 
   
   Or may be possible to precompute the overhead per row (max size of row) and then use that to derive number of rows to pack in a chunk / split ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9755: [multistage] leaf-node return data in non-serialized format

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9755:
URL: https://github.com/apache/pinot/pull/9755#discussion_r1018468629


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlockUtils.java:
##########
@@ -59,37 +60,67 @@ public static boolean isNoOpBlock(TransferableBlock transferableBlock) {
   }
 
   /**
-   *  Split a block into multiple block so that each block size is within maxBlockSize.
-   *  Currently, we only support split for row type dataBlock.
-   *  For columnar data block, we return the original data block.
-   *  Metadata data block split is not supported.
+   * Split block into multiple blocks. Default without any clean up.
    *
-   *  When row size is greater than maxBlockSize, we pack each row as a separate block.
+   * @see TransferableBlockUtils#splitBlock(TransferableBlock, BaseDataBlock.Type, int, boolean)
    */
   public static List<TransferableBlock> splitBlock(TransferableBlock block, BaseDataBlock.Type type, int maxBlockSize) {
+    return splitBlock(block, type, maxBlockSize, false);
+  }
+
+  /**
+   *
+   *  Split a block into multiple block so that each block size is within maxBlockSize. Currently,
+   *  <ul>
+   *    <li>For row data block, we split for row type dataBlock.</li>
+   *    <li>For columnar data block, we return the original data block.</li>
+   *    <li>For metadata block, split is not supported.</li>
+   *  </ul>
+   *
+   * @param block the data block
+   * @param type type of block
+   * @param maxBlockSize Each chunk of data is less than maxBlockSize
+   * @param isCleanupRequired whether clean up is required, set to true if the block is constructed from leaf stage.
+   * @return a list of data block chunks
+   */
+  public static List<TransferableBlock> splitBlock(TransferableBlock block, BaseDataBlock.Type type, int maxBlockSize,
+      boolean isCleanupRequired) {
     List<TransferableBlock> blockChunks = new ArrayList<>();
     if (type != DataBlock.Type.ROW) {
       return Collections.singletonList(block);
     } else {

Review Comment:
   I am incline to throw on COLUMNAR format. since we don't even have any support for columnar data format. and we should not split a block if it is metadata



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9755: [multistage] leaf-node return data in non-serialized format

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9755:
URL: https://github.com/apache/pinot/pull/9755#discussion_r1018467674


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java:
##########
@@ -67,12 +67,13 @@ public class MailboxSendOperator extends BaseOperator<TransferableBlock> {
   private final int _stageId;
   private final MailboxService<TransferableBlock> _mailboxService;
   private final DataSchema _dataSchema;
+  private final boolean _isLeafStageSender;
   private Operator<TransferableBlock> _dataTableBlockBaseOperator;
 
   public MailboxSendOperator(MailboxService<TransferableBlock> mailboxService, DataSchema dataSchema,
       Operator<TransferableBlock> dataTableBlockBaseOperator, List<ServerInstance> receivingStageInstances,
       RelDistribution.Type exchangeType, KeySelector<Object[], Object[]> keySelector, String hostName, int port,
-      long jobId, int stageId) {
+      long jobId, int stageId, boolean isLeafStageSender) {

Review Comment:
   only the leaf stage sender requires canonicalization b/c the data is stored in underlying DataSchema store format. which is different from how data block expects



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9755: [multistage] leaf-node return data in non-serialized format

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9755:
URL: https://github.com/apache/pinot/pull/9755#discussion_r1019248407


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java:
##########
@@ -76,7 +83,7 @@ public DataSchema getDataSchema() {
    *
    * @return data container.
    */
-  public List<Object[]> getContainer() {

Review Comment:
   actually nvm. with the leaf node split and all. we might not want to change this API and instead we should ask the orderby combine operator to flatten the priorityqueue into a list. 
   1. it won't be preserving the sorted structure anyway if transfer over the wire
   2. the whole idea is to avoid extra memory usage not save time, but it seems like we have to do it anyway, with split block and all



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr merged pull request #9755: [multistage] leaf-node return data in non-serialized format

Posted by GitBox <gi...@apache.org>.
walterddr merged PR #9755:
URL: https://github.com/apache/pinot/pull/9755


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9755: [multistage] leaf-node return data in non-serialized format

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9755:
URL: https://github.com/apache/pinot/pull/9755#discussion_r1018462505


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlockUtils.java:
##########
@@ -59,37 +60,67 @@ public static boolean isNoOpBlock(TransferableBlock transferableBlock) {
   }
 
   /**
-   *  Split a block into multiple block so that each block size is within maxBlockSize.
-   *  Currently, we only support split for row type dataBlock.
-   *  For columnar data block, we return the original data block.
-   *  Metadata data block split is not supported.
+   * Split block into multiple blocks. Default without any clean up.
    *
-   *  When row size is greater than maxBlockSize, we pack each row as a separate block.
+   * @see TransferableBlockUtils#splitBlock(TransferableBlock, BaseDataBlock.Type, int, boolean)
    */
   public static List<TransferableBlock> splitBlock(TransferableBlock block, BaseDataBlock.Type type, int maxBlockSize) {
+    return splitBlock(block, type, maxBlockSize, false);
+  }
+
+  /**
+   *
+   *  Split a block into multiple block so that each block size is within maxBlockSize. Currently,
+   *  <ul>
+   *    <li>For row data block, we split for row type dataBlock.</li>
+   *    <li>For columnar data block, we return the original data block.</li>
+   *    <li>For metadata block, split is not supported.</li>
+   *  </ul>
+   *
+   * @param block the data block
+   * @param type type of block
+   * @param maxBlockSize Each chunk of data is less than maxBlockSize
+   * @param isCleanupRequired whether clean up is required, set to true if the block is constructed from leaf stage.
+   * @return a list of data block chunks
+   */
+  public static List<TransferableBlock> splitBlock(TransferableBlock block, BaseDataBlock.Type type, int maxBlockSize,
+      boolean isCleanupRequired) {
     List<TransferableBlock> blockChunks = new ArrayList<>();
     if (type != DataBlock.Type.ROW) {
       return Collections.singletonList(block);
     } else {
-      int rowSizeInBytes = ((RowDataBlock) block.getDataBlock()).getRowSizeInBytes();
-      int numRowsPerChunk = maxBlockSize / rowSizeInBytes;
+      int estimatedRowSizeInBytes = block.getDataSchema().getColumnNames().length * 8;
+      int numRowsPerChunk = maxBlockSize / estimatedRowSizeInBytes;
       Preconditions.checkState(numRowsPerChunk > 0, "row size too large for query engine to handle, abort!");
 
-      int totalNumRows = block.getNumRows();
-      List<Object[]> allRows = block.getContainer();
-      int currentRow = 0;
-      while (currentRow < totalNumRows) {
-        List<Object[]> chunk = allRows.subList(currentRow, Math.min(currentRow + numRowsPerChunk, allRows.size()));
-        currentRow += numRowsPerChunk;
+      Collection<Object[]> allRows = block.getContainer();
+      DataSchema dataSchema = block.getDataSchema();
+      int rowId = 0;
+      List<Object[]> chunk = new ArrayList<>(numRowsPerChunk);
+      for (Object[] row : allRows) {
+        if (isCleanupRequired) {
+          chunk.add(cleanupRow(row, dataSchema));
+        } else {
+          chunk.add(row);
+        }
+        rowId++;
+        if (rowId % numRowsPerChunk == 0) {
+          blockChunks.add(new TransferableBlock(chunk, block.getDataSchema(), block.getType()));
+          chunk = new ArrayList<>();
+        }
+      }
+      if (chunk.size() > 0) {
         blockChunks.add(new TransferableBlock(chunk, block.getDataSchema(), block.getType()));
       }
     }
     return blockChunks;
   }
 
-  public static Object[] getRow(TransferableBlock transferableBlock, int rowId) {
-    Preconditions.checkState(transferableBlock.getType() == DataBlock.Type.ROW,
-        "TransferableBlockUtils doesn't support get row from non-ROW-based data block type yet!");
-    return transferableBlock.getContainer().get(rowId);
+  private static Object[] cleanupRow(Object[] row, DataSchema dataSchema) {

Review Comment:
   yeah. maybe `canonicalizeRow`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org