You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/06/21 09:13:12 UTC
[pulsar] branch master updated: Fix `messageQueue` release message issue. (#16155)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 141c44022a2 Fix `messageQueue` release message issue. (#16155)
141c44022a2 is described below
commit 141c44022a27be2fc07eab9827cfdb168e448953
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Tue Jun 21 17:13:00 2022 +0800
Fix `messageQueue` release message issue. (#16155)
---
.../org/apache/pulsar/sql/presto/PulsarRecordCursor.java | 14 ++++++--------
1 file changed, 6 insertions(+), 8 deletions(-)
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
index 1ea232203d3..558f6b47e9d 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
@@ -726,19 +726,17 @@ public class PulsarRecordCursor implements RecordCursor {
public void close() {
log.info("Closing cursor record");
- if (currentMessage != null) {
- currentMessage.release();
- }
-
- if (messageQueue != null) {
- messageQueue.drain(RawMessage::release);
- }
-
if (deserializeEntries != null) {
deserializeEntries.close().whenComplete((r, t) -> {
if (entryQueue != null) {
entryQueue.drain(Entry::release);
}
+ if (messageQueue != null) {
+ messageQueue.drain(RawMessage::release);
+ }
+ if (currentMessage != null) {
+ currentMessage.release();
+ }
});
}