You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/09/27 19:37:37 UTC

[GitHub] [pinot] 61yao commented on a diff in pull request #9422: [multistage] [enhancement] Split transferable block when the size is too large

61yao commented on code in PR #9422:
URL: https://github.com/apache/pinot/pull/9422#discussion_r981641204


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java:
##########
@@ -156,7 +157,7 @@ public boolean isErrorBlock() {
 
   public byte[] toBytes()
       throws IOException {
-    return _dataBlock.toBytes();
+    return getDataBlock().toBytes();
   }

Review Comment:
   Done



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlockUtils.java:
##########
@@ -43,4 +45,45 @@ public static TransferableBlock getErrorTransferableBlock(Map<Integer, String> e
   public static boolean isEndOfStream(TransferableBlock transferableBlock) {
     return transferableBlock.isEndOfStreamBlock();
   }
+
+  /**
+   *  Split a block into multiple block so that each block size is within maxBlockSize.
+   *  Currently, we only support split for row type dataBlock.
+   *  When row size is greater than maxBlockSize, we pack each row as a separate block.
+   */
+  public static List<TransferableBlock> splitBlock(TransferableBlock block, int maxBlockSize) {

Review Comment:
   Done



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java:
##########
@@ -192,14 +199,17 @@ private static List<BaseDataBlock> constructPartitionedDataBlock(BaseDataBlock d
     }
   }
 
-  private void sendDataTableBlock(ServerInstance serverInstance, BaseDataBlock dataBlock)
+  private void sendDataTableBlock(ServerInstance serverInstance, TransferableBlock block)
       throws IOException {
+    List<TransferableBlock> chunks = TransferableBlockUtils.splitBlock(block, _maxBlockSize);
     String mailboxId = toMailboxId(serverInstance);
-    SendingMailbox<Mailbox.MailboxContent> sendingMailbox = _mailboxService.getSendingMailbox(mailboxId);
-    Mailbox.MailboxContent mailboxContent = toMailboxContent(mailboxId, dataBlock);
-    sendingMailbox.send(mailboxContent);
-    if (mailboxContent.getMetadataMap().containsKey(ChannelUtils.MAILBOX_METADATA_END_OF_STREAM_KEY)) {
-      sendingMailbox.complete();
+    for (TransferableBlock chunk : chunks) {
+      SendingMailbox<Mailbox.MailboxContent> sendingMailbox = _mailboxService.getSendingMailbox(mailboxId);
+      Mailbox.MailboxContent mailboxContent = toMailboxContent(mailboxId, chunk.getDataBlock());
+      sendingMailbox.send(mailboxContent);
+      if (mailboxContent.getMetadataMap().containsKey(ChannelUtils.MAILBOX_METADATA_END_OF_STREAM_KEY)) {
+        sendingMailbox.complete();
+      }

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org