You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2021/06/21 07:18:22 UTC

[pulsar] branch branch-2.8 updated: [Broker] Fix direct memory leak in getLastMessageId (#10977)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.8 by this push:
     new 02ec937  [Broker] Fix direct memory leak in getLastMessageId (#10977)
02ec937 is described below

commit 02ec9373d54cdee38baf01fdc1672c2ef20fe4fa
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Sat Jun 19 10:48:27 2021 +0300

    [Broker] Fix direct memory leak in getLastMessageId (#10977)
    
    (cherry picked from commit 7417ca87e623e56454b1d812ee957ddee511a78e)
---
 .../broker/service/persistent/PersistentTopic.java    | 19 ++++++++++++-------
 1 file changed, 12 insertions(+), 7 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 65a526e..644b726 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2571,13 +2571,18 @@ public class PersistentTopic extends AbstractTopic
         ledgerImpl.asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback() {
             @Override
             public void readEntryComplete(Entry entry, Object ctx) {
-                MessageMetadata metadata = Commands.parseMessageMetadata(entry.getDataBuffer());
-                if (metadata.hasNumMessagesInBatch()) {
-                    completableFuture.complete(new BatchMessageIdImpl(position.getLedgerId(), position.getEntryId(),
-                            partitionIndex, metadata.getNumMessagesInBatch() - 1));
-                } else {
-                    completableFuture
-                            .complete(new MessageIdImpl(position.getLedgerId(), position.getEntryId(), partitionIndex));
+                try {
+                    MessageMetadata metadata = Commands.parseMessageMetadata(entry.getDataBuffer());
+                    if (metadata.hasNumMessagesInBatch()) {
+                        completableFuture.complete(new BatchMessageIdImpl(position.getLedgerId(), position.getEntryId(),
+                                partitionIndex, metadata.getNumMessagesInBatch() - 1));
+                    } else {
+                        completableFuture
+                                .complete(new MessageIdImpl(position.getLedgerId(), position.getEntryId(),
+                                        partitionIndex));
+                    }
+                } finally {
+                    entry.release();
                 }
             }