You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/09/27 16:25:11 UTC

[GitHub] [pinot] walterddr commented on a diff in pull request #9422: [multistage] [enhancement] Split transferable block when the size is too large

walterddr commented on code in PR #9422:
URL: https://github.com/apache/pinot/pull/9422#discussion_r981451980


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java:
##########
@@ -192,14 +199,17 @@ private static List<BaseDataBlock> constructPartitionedDataBlock(BaseDataBlock d
     }
   }
 
-  private void sendDataTableBlock(ServerInstance serverInstance, BaseDataBlock dataBlock)
+  private void sendDataTableBlock(ServerInstance serverInstance, TransferableBlock block)
       throws IOException {
+    List<TransferableBlock> chunks = TransferableBlockUtils.splitBlock(block, _maxBlockSize);
     String mailboxId = toMailboxId(serverInstance);
-    SendingMailbox<Mailbox.MailboxContent> sendingMailbox = _mailboxService.getSendingMailbox(mailboxId);
-    Mailbox.MailboxContent mailboxContent = toMailboxContent(mailboxId, dataBlock);
-    sendingMailbox.send(mailboxContent);
-    if (mailboxContent.getMetadataMap().containsKey(ChannelUtils.MAILBOX_METADATA_END_OF_STREAM_KEY)) {
-      sendingMailbox.complete();
+    for (TransferableBlock chunk : chunks) {
+      SendingMailbox<Mailbox.MailboxContent> sendingMailbox = _mailboxService.getSendingMailbox(mailboxId);
+      Mailbox.MailboxContent mailboxContent = toMailboxContent(mailboxId, chunk.getDataBlock());
+      sendingMailbox.send(mailboxContent);
+      if (mailboxContent.getMetadataMap().containsKey(ChannelUtils.MAILBOX_METADATA_END_OF_STREAM_KEY)) {
+        sendingMailbox.complete();
+      }

Review Comment:
   this should be done outside of the loop --> maybe we can just check against transferrable block contains a metadata block



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java:
##########
@@ -156,7 +157,7 @@ public boolean isErrorBlock() {
 
   public byte[] toBytes()
       throws IOException {
-    return _dataBlock.toBytes();
+    return getDataBlock().toBytes();
   }

Review Comment:
   Add a new API call and rename the current one:
   ```
   public byte[] getDataBlock() {
     // ...
   }
   public List<byte[]> getDataBlockTrunk() {
     // ...
   }
   ```
   



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlockUtils.java:
##########
@@ -43,4 +45,45 @@ public static TransferableBlock getErrorTransferableBlock(Map<Integer, String> e
   public static boolean isEndOfStream(TransferableBlock transferableBlock) {
     return transferableBlock.isEndOfStreamBlock();
   }
+
+  /**
+   *  Split a block into multiple block so that each block size is within maxBlockSize.
+   *  Currently, we only support split for row type dataBlock.
+   *  When row size is greater than maxBlockSize, we pack each row as a separate block.
+   */
+  public static List<TransferableBlock> splitBlock(TransferableBlock block, int maxBlockSize) {

Review Comment:
   dont make this public and encapsulate this inside transferable block



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org