You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/11/10 10:01:52 UTC

[GitHub] [pulsar] gaoran10 opened a new pull request #12720: [Pulsar SQL] Support query chunked messages feature in Pulsar SQL

gaoran10 opened a new pull request #12720:
URL: https://github.com/apache/pulsar/pull/12720


   ### Motivation
   
   Currently, the Pulsar SQL didn't support query chunked messages.
   
   ### Modifications
   
   Add a chunked message map in `PulsarRecordCursor` to maintain incomplete chunked messages, if one chunked message was received completely, it will be offered in the message queue to wait for deserialization.
   
   ### Verifying this change
   
   Add a unit test to verify querying chunked messages normally.
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API: (no)
     - The schema: (no)
     - The default values of configurations: (no)
     - The wire protocol: (no)
     - The rest endpoints: (no)
     - The admin cli options: (no)
     - Anything that affects deployment: (no)
   
   ### Documentation
   
   Check the box below and label this PR (if you have committer privilege).
   
   Need to update docs? 
   
   - [ ] `doc-required` 
     
     (If you need help on updating docs, create a doc issue)
     
   - [x] `no-need-doc` 
     
     (Please explain why)
     
   - [ ] `doc` 
     
     (If this PR contains doc changes)
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 commented on a change in pull request #12720: [Pulsar SQL] Support query chunked messages feature in Pulsar SQL

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on a change in pull request #12720:
URL: https://github.com/apache/pulsar/pull/12720#discussion_r750000070



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -479,6 +479,13 @@ public void sendAsync(Message<?> message, SendCallback callback) {
                 }
                 String uuid = totalChunks > 1 ? String.format("%s-%d", producerName, sequenceId) : null;
                 for (int chunkId = 0; chunkId < totalChunks; chunkId++) {
+                    // Need to reset the schemaVersion, because the schemaVersion is a ByteBuf object in
+                    // `MessageMetadata`, if we want to re-serialize the `SEND` command using a same `MessageMetadata`,
+                    // we need to reset the ByteBuf of the schemaVersion in `MessageMetadata`, I think we need to
+                    // reset `ByteBuf` objects in `MessageMetadata` after call the method `MessageMetadata#writeTo()`.
+                    if (chunkId > 0 && msg.getMessageBuilder().hasSchemaVersion()) {

Review comment:
       The `MessageMetadata` is used to serialize protobuf data, if we want to call the method `writeTo()` of the class `MessageMetadata` multi times, it's better to modify the logic of the `MessageMetadata`, the read index of `ByteBuf` object should be reset after writing.
   
   

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -479,6 +479,13 @@ public void sendAsync(Message<?> message, SendCallback callback) {
                 }
                 String uuid = totalChunks > 1 ? String.format("%s-%d", producerName, sequenceId) : null;
                 for (int chunkId = 0; chunkId < totalChunks; chunkId++) {
+                    // Need to reset the schemaVersion, because the schemaVersion is a ByteBuf object in
+                    // `MessageMetadata`, if we want to re-serialize the `SEND` command using a same `MessageMetadata`,
+                    // we need to reset the ByteBuf of the schemaVersion in `MessageMetadata`, I think we need to
+                    // reset `ByteBuf` objects in `MessageMetadata` after call the method `MessageMetadata#writeTo()`.
+                    if (chunkId > 0 && msg.getMessageBuilder().hasSchemaVersion()) {

Review comment:
       The `MessageMetadata` is used to serialize protobuf data, if we want to call the method `writeTo()` of the class `MessageMetadata` multi times, maybe it's better to modify the logic of the `MessageMetadata`, the read index of `ByteBuf` object should be reset after writing.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] MarvinCai commented on pull request #12720: [Pulsar SQL] Support query chunked messages feature in Pulsar SQL

Posted by GitBox <gi...@apache.org>.
MarvinCai commented on pull request #12720:
URL: https://github.com/apache/pulsar/pull/12720#issuecomment-976354397


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui merged pull request #12720: [Pulsar SQL] Support query chunked messages feature in Pulsar SQL

Posted by GitBox <gi...@apache.org>.
codelipenghui merged pull request #12720:
URL: https://github.com/apache/pulsar/pull/12720


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 commented on a change in pull request #12720: [Pulsar SQL] Support query chunked messages feature in Pulsar SQL

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on a change in pull request #12720:
URL: https://github.com/apache/pulsar/pull/12720#discussion_r749987884



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -60,6 +60,7 @@
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import lombok.Data;

Review comment:
       Oh, I miss removing this. Thanks




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #12720: [Pulsar SQL] Support query chunked messages feature in Pulsar SQL

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on pull request #12720:
URL: https://github.com/apache/pulsar/pull/12720#issuecomment-986363326


   @RobertIndie Could you please help review this PR since you are working on the chunk message fixes for now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] MarvinCai commented on pull request #12720: [Pulsar SQL] Support query chunked messages feature in Pulsar SQL

Posted by GitBox <gi...@apache.org>.
MarvinCai commented on pull request #12720:
URL: https://github.com/apache/pulsar/pull/12720#issuecomment-976100458


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 commented on a change in pull request #12720: [Pulsar SQL] Support query chunked messages feature in Pulsar SQL

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on a change in pull request #12720:
URL: https://github.com/apache/pulsar/pull/12720#discussion_r750000070



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -479,6 +479,13 @@ public void sendAsync(Message<?> message, SendCallback callback) {
                 }
                 String uuid = totalChunks > 1 ? String.format("%s-%d", producerName, sequenceId) : null;
                 for (int chunkId = 0; chunkId < totalChunks; chunkId++) {
+                    // Need to reset the schemaVersion, because the schemaVersion is a ByteBuf object in
+                    // `MessageMetadata`, if we want to re-serialize the `SEND` command using a same `MessageMetadata`,
+                    // we need to reset the ByteBuf of the schemaVersion in `MessageMetadata`, I think we need to
+                    // reset `ByteBuf` objects in `MessageMetadata` after call the method `MessageMetadata#writeTo()`.
+                    if (chunkId > 0 && msg.getMessageBuilder().hasSchemaVersion()) {

Review comment:
       If we want to call the method `writeTo()` of the class `MessageMetadata` multi times, it's better to modify the logic of the `MessageMetadata`, the read index of `ByteBuf` object should be reset after writing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 commented on pull request #12720: [Pulsar SQL] Support query chunked messages feature in Pulsar SQL

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on pull request #12720:
URL: https://github.com/apache/pulsar/pull/12720#issuecomment-969993636


   > so we're simply skipping chunked message that span cross different Presto split right?
   
   Hi, @MarvinCai , if one chunked message crosses different splits, we'll ensure the split which owns the beginning chunk that should receive all chunked messages belonging to the chunk, so we need to read more entries beyond the split end position if there are incomplete chunked messages. The next split will ignore chunked messages if it didn't maintain the first chunk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] MarvinCai commented on a change in pull request #12720: [Pulsar SQL] Support query chunked messages feature in Pulsar SQL

Posted by GitBox <gi...@apache.org>.
MarvinCai commented on a change in pull request #12720:
URL: https://github.com/apache/pulsar/pull/12720#discussion_r749890992



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -479,6 +479,13 @@ public void sendAsync(Message<?> message, SendCallback callback) {
                 }
                 String uuid = totalChunks > 1 ? String.format("%s-%d", producerName, sequenceId) : null;
                 for (int chunkId = 0; chunkId < totalChunks; chunkId++) {
+                    // Need to reset the schemaVersion, because the schemaVersion is a ByteBuf object in
+                    // `MessageMetadata`, if we want to re-serialize the `SEND` command using a same `MessageMetadata`,
+                    // we need to reset the ByteBuf of the schemaVersion in `MessageMetadata`, I think we need to
+                    // reset `ByteBuf` objects in `MessageMetadata` after call the method `MessageMetadata#writeTo()`.
+                    if (chunkId > 0 && msg.getMessageBuilder().hasSchemaVersion()) {

Review comment:
       seems getSchemaVersion() will copy data around, might be good if we just store the result as it won't change




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 commented on a change in pull request #12720: [Pulsar SQL] Support query chunked messages feature in Pulsar SQL

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on a change in pull request #12720:
URL: https://github.com/apache/pulsar/pull/12720#discussion_r749986299



##########
File path: pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
##########
@@ -706,4 +730,94 @@ private void initEntryCacheSizeAllocator(PulsarConnectorConfig connectorConfig)
         }
     }
 
+    private RawMessage processChunkedMessages(RawMessage message) {
+        final String uuid = message.getUUID();
+        final int chunkId = message.getChunkId();
+        final int totalChunkMsgSize = message.getTotalChunkMsgSize();
+        final int numChunks = message.getNumChunksFromMsg();
+
+        RawMessageIdImpl rawMessageId = (RawMessageIdImpl) message.getMessageId();
+        if (rawMessageId.getLedgerId() > pulsarSplit.getEndPositionLedgerId()
+                && !chunkedMessagesMap.containsKey(uuid)) {
+            // If the message is out of the split range, we only care about the incomplete chunked messages.
+            message.release();
+            return null;
+        }
+        if (chunkId == 0) {
+            ByteBuf chunkedMsgBuffer = Unpooled.directBuffer(totalChunkMsgSize, totalChunkMsgSize);
+            chunkedMessagesMap.computeIfAbsent(uuid, (key) -> ChunkedMessageCtx.get(numChunks, chunkedMsgBuffer));
+        }
+
+        ChunkedMessageCtx chunkedMsgCtx = chunkedMessagesMap.get(uuid);
+        if (chunkedMsgCtx == null || chunkedMsgCtx.chunkedMsgBuffer == null
+                || chunkId != (chunkedMsgCtx.lastChunkedMessageId + 1) || chunkId >= numChunks) {
+            // Means we lost the first chunk, it will happens when the beginning chunk didn't belong to this split.

Review comment:
       Thanks




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 commented on a change in pull request #12720: [Pulsar SQL] Support query chunked messages feature in Pulsar SQL

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on a change in pull request #12720:
URL: https://github.com/apache/pulsar/pull/12720#discussion_r749986126



##########
File path: pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
##########
@@ -278,15 +287,25 @@ public void accept(Entry entry) {
                                                 // start time for message queue read
                                                 metricsTracker.start_MESSAGE_QUEUE_ENQUEUE_WAIT_TIME();
 
-                                                while (true) {
-                                                    if (!haveAvailableCacheSize(
-                                                            messageQueueCacheSizeAllocator, messageQueue)
-                                                            || !messageQueue.offer(message)) {
-                                                        Thread.sleep(1);
-                                                    } else {
-                                                        messageQueueCacheSizeAllocator.allocate(
-                                                                message.getData().readableBytes());
-                                                        break;
+                                                if (message.getNumChunksFromMsg() > 1)  {
+                                                    message = processChunkedMessages(message);
+                                                } else if (entryExceedSplitEndPosition(entry)) {

Review comment:
       ```
   # The code block at line 276.
   if (entryExceedSplitEndPosition(entry) && chunkedMessagesMap.isEmpty()) {
       return;
   }
   ```
   If the read position exceeds the split end position and the chunkedMessagesMap isn't empty, the check at line 292 may be hit. If the entry exceeds the split end position, we only care about entries that are related to incomplete chunked messages, the no-chunked messages should be released.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] MarvinCai commented on pull request #12720: [Pulsar SQL] Support query chunked messages feature in Pulsar SQL

Posted by GitBox <gi...@apache.org>.
MarvinCai commented on pull request #12720:
URL: https://github.com/apache/pulsar/pull/12720#issuecomment-976234384


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #12720: [Pulsar SQL] Support query chunked messages feature in Pulsar SQL

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on pull request #12720:
URL: https://github.com/apache/pulsar/pull/12720#issuecomment-965007501


   @MarvinCai Could you please help review this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 commented on a change in pull request #12720: [Pulsar SQL] Support query chunked messages feature in Pulsar SQL

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on a change in pull request #12720:
URL: https://github.com/apache/pulsar/pull/12720#discussion_r749994015



##########
File path: pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
##########
@@ -706,4 +730,94 @@ private void initEntryCacheSizeAllocator(PulsarConnectorConfig connectorConfig)
         }
     }
 
+    private RawMessage processChunkedMessages(RawMessage message) {
+        final String uuid = message.getUUID();
+        final int chunkId = message.getChunkId();
+        final int totalChunkMsgSize = message.getTotalChunkMsgSize();
+        final int numChunks = message.getNumChunksFromMsg();
+
+        RawMessageIdImpl rawMessageId = (RawMessageIdImpl) message.getMessageId();
+        if (rawMessageId.getLedgerId() > pulsarSplit.getEndPositionLedgerId()
+                && !chunkedMessagesMap.containsKey(uuid)) {
+            // If the message is out of the split range, we only care about the incomplete chunked messages.
+            message.release();
+            return null;
+        }
+        if (chunkId == 0) {
+            ByteBuf chunkedMsgBuffer = Unpooled.directBuffer(totalChunkMsgSize, totalChunkMsgSize);
+            chunkedMessagesMap.computeIfAbsent(uuid, (key) -> ChunkedMessageCtx.get(numChunks, chunkedMsgBuffer));
+        }
+
+        ChunkedMessageCtx chunkedMsgCtx = chunkedMessagesMap.get(uuid);
+        if (chunkedMsgCtx == null || chunkedMsgCtx.chunkedMsgBuffer == null
+                || chunkId != (chunkedMsgCtx.lastChunkedMessageId + 1) || chunkId >= numChunks) {
+            // Means we lost the first chunk, it will happens when the beginning chunk didn't belong to this split.
+            log.info("Received unexpected chunk. messageId: %s, last-chunk-id: %s chunkId: %s, totalChunks: %s",
+                    message.getMessageId(),
+                    (chunkedMsgCtx != null ? chunkedMsgCtx.lastChunkedMessageId : null), chunkId,
+                    numChunks);
+            if (chunkedMsgCtx != null) {
+                if (chunkedMsgCtx.chunkedMsgBuffer != null) {
+                    ReferenceCountUtil.safeRelease(chunkedMsgCtx.chunkedMsgBuffer);
+                }
+                chunkedMsgCtx.recycle();
+            }
+            chunkedMessagesMap.remove(uuid);
+            message.release();
+            return null;
+        }
+
+        // append the chunked payload and update lastChunkedMessage-id
+        chunkedMsgCtx.chunkedMsgBuffer.writeBytes(message.getData());
+        chunkedMsgCtx.lastChunkedMessageId = chunkId;
+
+        // if final chunk is not received yet then release payload and return
+        if (chunkId != (numChunks - 1)) {
+            message.release();
+            return null;
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("Chunked message completed. chunkId: %s, totalChunks: %s, msgId: %s, sequenceId: %s",
+                    chunkId, numChunks, rawMessageId, message.getSequenceId());
+        }
+        chunkedMessagesMap.remove(uuid);
+        ByteBuf unCompressedPayload = chunkedMsgCtx.chunkedMsgBuffer;
+        chunkedMsgCtx.recycle();
+        return ((RawMessageImpl) message).updatePayloadForChunkedMessage(unCompressedPayload);
+    }
+

Review comment:
       Ok




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] MarvinCai commented on pull request #12720: [Pulsar SQL] Support query chunked messages feature in Pulsar SQL

Posted by GitBox <gi...@apache.org>.
MarvinCai commented on pull request #12720:
URL: https://github.com/apache/pulsar/pull/12720#issuecomment-976374184


   @gaoran10 the unit test keep failing, can you please take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] MarvinCai commented on pull request #12720: [Pulsar SQL] Support query chunked messages feature in Pulsar SQL

Posted by GitBox <gi...@apache.org>.
MarvinCai commented on pull request #12720:
URL: https://github.com/apache/pulsar/pull/12720#issuecomment-976190477


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 commented on pull request #12720: [Pulsar SQL] Support query chunked messages feature in Pulsar SQL

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on pull request #12720:
URL: https://github.com/apache/pulsar/pull/12720#issuecomment-978894732


   > @gaoran10 the unit test keep failing, can you please take a look?
   
   Sorry late response, I'll take a look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] MarvinCai commented on a change in pull request #12720: [Pulsar SQL] Support query chunked messages feature in Pulsar SQL

Posted by GitBox <gi...@apache.org>.
MarvinCai commented on a change in pull request #12720:
URL: https://github.com/apache/pulsar/pull/12720#discussion_r749802832



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -60,6 +60,7 @@
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import lombok.Data;

Review comment:
       nit: seems unnecessary import.

##########
File path: pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
##########
@@ -706,4 +730,94 @@ private void initEntryCacheSizeAllocator(PulsarConnectorConfig connectorConfig)
         }
     }
 
+    private RawMessage processChunkedMessages(RawMessage message) {
+        final String uuid = message.getUUID();
+        final int chunkId = message.getChunkId();
+        final int totalChunkMsgSize = message.getTotalChunkMsgSize();
+        final int numChunks = message.getNumChunksFromMsg();
+
+        RawMessageIdImpl rawMessageId = (RawMessageIdImpl) message.getMessageId();
+        if (rawMessageId.getLedgerId() > pulsarSplit.getEndPositionLedgerId()
+                && !chunkedMessagesMap.containsKey(uuid)) {
+            // If the message is out of the split range, we only care about the incomplete chunked messages.
+            message.release();
+            return null;
+        }
+        if (chunkId == 0) {
+            ByteBuf chunkedMsgBuffer = Unpooled.directBuffer(totalChunkMsgSize, totalChunkMsgSize);
+            chunkedMessagesMap.computeIfAbsent(uuid, (key) -> ChunkedMessageCtx.get(numChunks, chunkedMsgBuffer));
+        }
+
+        ChunkedMessageCtx chunkedMsgCtx = chunkedMessagesMap.get(uuid);
+        if (chunkedMsgCtx == null || chunkedMsgCtx.chunkedMsgBuffer == null
+                || chunkId != (chunkedMsgCtx.lastChunkedMessageId + 1) || chunkId >= numChunks) {
+            // Means we lost the first chunk, it will happens when the beginning chunk didn't belong to this split.
+            log.info("Received unexpected chunk. messageId: %s, last-chunk-id: %s chunkId: %s, totalChunks: %s",
+                    message.getMessageId(),
+                    (chunkedMsgCtx != null ? chunkedMsgCtx.lastChunkedMessageId : null), chunkId,
+                    numChunks);
+            if (chunkedMsgCtx != null) {
+                if (chunkedMsgCtx.chunkedMsgBuffer != null) {
+                    ReferenceCountUtil.safeRelease(chunkedMsgCtx.chunkedMsgBuffer);
+                }
+                chunkedMsgCtx.recycle();
+            }
+            chunkedMessagesMap.remove(uuid);
+            message.release();
+            return null;
+        }
+
+        // append the chunked payload and update lastChunkedMessage-id
+        chunkedMsgCtx.chunkedMsgBuffer.writeBytes(message.getData());
+        chunkedMsgCtx.lastChunkedMessageId = chunkId;
+
+        // if final chunk is not received yet then release payload and return
+        if (chunkId != (numChunks - 1)) {
+            message.release();
+            return null;
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("Chunked message completed. chunkId: %s, totalChunks: %s, msgId: %s, sequenceId: %s",
+                    chunkId, numChunks, rawMessageId, message.getSequenceId());
+        }
+        chunkedMessagesMap.remove(uuid);
+        ByteBuf unCompressedPayload = chunkedMsgCtx.chunkedMsgBuffer;
+        chunkedMsgCtx.recycle();
+        return ((RawMessageImpl) message).updatePayloadForChunkedMessage(unCompressedPayload);
+    }
+

Review comment:
       if I understand correctly, this is to buffer the chunk message payload till we receive the whole message, it'll be good if we can add brief doc for it.

##########
File path: pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
##########
@@ -706,4 +730,94 @@ private void initEntryCacheSizeAllocator(PulsarConnectorConfig connectorConfig)
         }
     }
 
+    private RawMessage processChunkedMessages(RawMessage message) {
+        final String uuid = message.getUUID();
+        final int chunkId = message.getChunkId();
+        final int totalChunkMsgSize = message.getTotalChunkMsgSize();
+        final int numChunks = message.getNumChunksFromMsg();
+
+        RawMessageIdImpl rawMessageId = (RawMessageIdImpl) message.getMessageId();
+        if (rawMessageId.getLedgerId() > pulsarSplit.getEndPositionLedgerId()
+                && !chunkedMessagesMap.containsKey(uuid)) {
+            // If the message is out of the split range, we only care about the incomplete chunked messages.
+            message.release();
+            return null;
+        }
+        if (chunkId == 0) {
+            ByteBuf chunkedMsgBuffer = Unpooled.directBuffer(totalChunkMsgSize, totalChunkMsgSize);
+            chunkedMessagesMap.computeIfAbsent(uuid, (key) -> ChunkedMessageCtx.get(numChunks, chunkedMsgBuffer));
+        }
+
+        ChunkedMessageCtx chunkedMsgCtx = chunkedMessagesMap.get(uuid);
+        if (chunkedMsgCtx == null || chunkedMsgCtx.chunkedMsgBuffer == null
+                || chunkId != (chunkedMsgCtx.lastChunkedMessageId + 1) || chunkId >= numChunks) {
+            // Means we lost the first chunk, it will happens when the beginning chunk didn't belong to this split.

Review comment:
       ```suggestion
               // Means we lost the first chunk, it will happen when the beginning chunk didn't belong to this split.
   ```

##########
File path: pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
##########
@@ -278,15 +287,25 @@ public void accept(Entry entry) {
                                                 // start time for message queue read
                                                 metricsTracker.start_MESSAGE_QUEUE_ENQUEUE_WAIT_TIME();
 
-                                                while (true) {
-                                                    if (!haveAvailableCacheSize(
-                                                            messageQueueCacheSizeAllocator, messageQueue)
-                                                            || !messageQueue.offer(message)) {
-                                                        Thread.sleep(1);
-                                                    } else {
-                                                        messageQueueCacheSizeAllocator.allocate(
-                                                                message.getData().readableBytes());
-                                                        break;
+                                                if (message.getNumChunksFromMsg() > 1)  {
+                                                    message = processChunkedMessages(message);
+                                                } else if (entryExceedSplitEndPosition(entry)) {

Review comment:
       will this ever be true? as we've checked at line 276




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org