You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/06/21 09:13:12 UTC

[pulsar] branch master updated: Fix `messageQueue` release message issue. (#16155)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 141c44022a2 Fix `messageQueue` release message issue. (#16155)
141c44022a2 is described below

commit 141c44022a27be2fc07eab9827cfdb168e448953
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Tue Jun 21 17:13:00 2022 +0800

    Fix `messageQueue` release message issue. (#16155)
---
 .../org/apache/pulsar/sql/presto/PulsarRecordCursor.java   | 14 ++++++--------
 1 file changed, 6 insertions(+), 8 deletions(-)

diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
index 1ea232203d3..558f6b47e9d 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
@@ -726,19 +726,17 @@ public class PulsarRecordCursor implements RecordCursor {
     public void close() {
         log.info("Closing cursor record");
 
-        if (currentMessage != null) {
-            currentMessage.release();
-        }
-
-        if (messageQueue != null) {
-            messageQueue.drain(RawMessage::release);
-        }
-
         if (deserializeEntries != null) {
             deserializeEntries.close().whenComplete((r, t) -> {
                 if (entryQueue != null) {
                     entryQueue.drain(Entry::release);
                 }
+                if (messageQueue != null) {
+                    messageQueue.drain(RawMessage::release);
+                }
+                if (currentMessage != null) {
+                    currentMessage.release();
+                }
             });
         }