You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ch...@apache.org on 2021/08/29 04:16:15 UTC

[pulsar] 01/02: Fixed accessing MessageImpl after it was enqueued on user queue (#11824)

This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 014a69c701513e53508afde69411784d82964924
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Sat Aug 28 08:46:41 2021 -0700

    Fixed accessing MessageImpl after it was enqueued on user queue (#11824)
    
    (cherry picked from commit 666ad3b13cbbf35c329fb3fd433f117d6d893e0a)
---
 .../main/java/org/apache/pulsar/client/impl/ConsumerBase.java    | 9 ++++-----
 1 file changed, 4 insertions(+), 5 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index f4002c2..d2b4f2a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -727,8 +727,11 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
     }
 
     protected boolean enqueueMessageAndCheckBatchReceive(Message<T> message) {
+        int messageSize = message.size();
         if (canEnqueueMessage(message) && incomingMessages.offer(message)) {
-            increaseIncomingMessageSize(message);
+            // After we have enqueued the messages on `incomingMessages` queue, we cannot touch the message instance
+            // anymore, since for pooled messages, this instance was possibly already been released and recycled.
+            INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, messageSize);
         }
         return hasEnoughMessagesForBatchReceive();
     }
@@ -970,10 +973,6 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
         return pendingBatchReceives != null && hasNextBatchReceive();
     }
 
-    protected void increaseIncomingMessageSize(final Message<?> message) {
-        INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, message.size());
-    }
-
     protected void resetIncomingMessageSize() {
         INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0);
     }