You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2019/10/22 23:00:44 UTC

[GitHub] [pulsar] rdhabalia commented on a change in pull request #5443: [pulsar-client] Fix message corruption on OOM for batch messages

rdhabalia commented on a change in pull request #5443: [pulsar-client] Fix message corruption on OOM for batch messages
URL: https://github.com/apache/pulsar/pull/5443#discussion_r337786504
 
 

 ##########
 File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
 ##########
 @@ -82,11 +82,32 @@ public void add(MessageImpl<?> msg, SendCallback callback) {
     }
 
     private ByteBuf getCompressedBatchMetadataAndPayload() {
-        for (MessageImpl<?> msg : messages) {
+        batchedMessageMetadataAndPayload.markWriterIndex();
+        batchedMessageMetadataAndPayload.markReaderIndex();
+        int lastSerializedMessageIndex = 0;
+        
+        for(MessageImpl<?> msg : messages) {
             PulsarApi.MessageMetadata.Builder msgBuilder = msg.getMessageBuilder();
-            batchedMessageMetadataAndPayload = Commands.serializeSingleMessageInBatchWithPayload(msgBuilder,
-                    msg.getDataBuffer(), batchedMessageMetadataAndPayload);
-            msgBuilder.recycle();
+            msg.getDataBuffer().markReaderIndex();
+            try {
+                batchedMessageMetadataAndPayload = Commands.serializeSingleMessageInBatchWithPayload(msgBuilder,
+                        msg.getDataBuffer(), batchedMessageMetadataAndPayload);
+            } catch (Throwable th) {
+                // serializing batch message can corrupt the index of message and batch-message. Reset the index so,
+                // next iteration doesn't send corrupt message to broker.
+                for (int j = 0; j <= lastSerializedMessageIndex; j++) {
+                    MessageImpl<?> previousMsg = messages.get(j);
+                    previousMsg.getDataBuffer().resetReaderIndex();
+                }
+                batchedMessageMetadataAndPayload.resetWriterIndex();
+                batchedMessageMetadataAndPayload.resetReaderIndex();
+                throw new RuntimeException(th);
+            }
+            lastSerializedMessageIndex++;
+        }
+        // Recycle messages only once they serialized successfully in batch 
+        for (MessageImpl<?> msg : messages) {
+            msg.getMessageBuilder().recycle();
 
 Review comment:
   yes, this one is necessary to recycle all of them together once serializing of all messages completed else it throws NPE if batch-scheduler tries to use already recycled msgBuilder.
   eg:
   1. if batch has 4 messages and serialization of 2 msgs successful and have been recycled
   2. now 3rd msg fails with exception
   3. batch-scheduler again try to serialize all 4 messages and at that time serialization throws NPE for first 2 msgs because builder is already recycled.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services