You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ch...@apache.org on 2021/08/29 04:16:15 UTC
[pulsar] 01/02: Fixed accessing MessageImpl after it was enqueued
on user queue (#11824)
This is an automated email from the ASF dual-hosted git repository.
chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 014a69c701513e53508afde69411784d82964924
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Sat Aug 28 08:46:41 2021 -0700
Fixed accessing MessageImpl after it was enqueued on user queue (#11824)
(cherry picked from commit 666ad3b13cbbf35c329fb3fd433f117d6d893e0a)
---
.../main/java/org/apache/pulsar/client/impl/ConsumerBase.java | 9 ++++-----
1 file changed, 4 insertions(+), 5 deletions(-)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index f4002c2..d2b4f2a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -727,8 +727,11 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
}
protected boolean enqueueMessageAndCheckBatchReceive(Message<T> message) {
+ int messageSize = message.size();
if (canEnqueueMessage(message) && incomingMessages.offer(message)) {
- increaseIncomingMessageSize(message);
+ // After we have enqueued the messages on `incomingMessages` queue, we cannot touch the message instance
+ // anymore, since for pooled messages, this instance was possibly already been released and recycled.
+ INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, messageSize);
}
return hasEnoughMessagesForBatchReceive();
}
@@ -970,10 +973,6 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
return pendingBatchReceives != null && hasNextBatchReceive();
}
- protected void increaseIncomingMessageSize(final Message<?> message) {
- INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, message.size());
- }
-
protected void resetIncomingMessageSize() {
INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0);
}