You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/24 16:48:12 UTC

[pulsar] branch master updated: Bugfix - release and recycle on discarded messages (#4342)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 71f3928  Bugfix - release and recycle on discarded messages (#4342)
71f3928 is described below

commit 71f3928722fc702f2f8699c68dd7262d53d0a630
Author: Ezequiel Lovelle <ez...@gmail.com>
AuthorDate: Fri May 24 13:48:06 2019 -0300

    Bugfix - release and recycle on discarded messages (#4342)
    
    Don't leak resources when a message is being discarded.
    
    *Modifications*
    
      - Fix missing release() and recycle() for discarded message on
        receiveIndividualMessagesFromBatch method.
      - Fix argument missing of debug logging {}-placeholder.
      - Fix unnecessary variable reference `payload` on messageReceived().
---
 .../java/org/apache/pulsar/client/impl/ConsumerImpl.java   | 14 +++++++-------
 1 file changed, 7 insertions(+), 7 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 8d91ca1..3426e22 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -747,17 +747,15 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                     messageId.getEntryId());
         }
 
-        MessageMetadata msgMetadata = null;
-        ByteBuf payload = headersAndPayload;
-
         if (!verifyChecksum(headersAndPayload, messageId)) {
             // discard message with checksum error
             discardCorruptedMessage(messageId, cnx, ValidationError.ChecksumMismatch);
             return;
         }
 
+        MessageMetadata msgMetadata;
         try {
-            msgMetadata = Commands.parseMessageMetadata(payload);
+            msgMetadata = Commands.parseMessageMetadata(headersAndPayload);
         } catch (Throwable t) {
             discardCorruptedMessage(messageId, cnx, ValidationError.ChecksumMismatch);
             return;
@@ -768,15 +766,15 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         MessageIdImpl msgId = new MessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(), getPartitionIndex());
         if (acknowledgmentsGroupingTracker.isDuplicate(msgId)) {
             if (log.isDebugEnabled()) {
-                log.debug("[{}][{}] Ignoring message as it was already being acked earlier by same consumer {}/{}",
-                        topic, subscription, msgId);
+                log.debug("[{}] [{}] Ignoring message as it was already being acked earlier by same consumer {}/{}",
+                        topic, subscription, consumerName, msgId);
             }
 
             increaseAvailablePermits(cnx, numMessages);
             return;
         }
 
-        ByteBuf decryptedPayload = decryptPayloadIfNeeded(messageId, msgMetadata, payload, cnx);
+        ByteBuf decryptedPayload = decryptPayloadIfNeeded(messageId, msgMetadata, headersAndPayload, cnx);
 
         boolean isMessageUndecryptable = isMessageUndecryptable(msgMetadata);
 
@@ -950,6 +948,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                         log.debug("[{}] [{}] Ignoring message from before the startMessageId", subscription,
                                 consumerName);
                     }
+                    singleMessagePayload.release();
+                    singleMessageMetadataBuilder.recycle();
 
                     ++skippedMessages;
                     continue;