You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bl...@apache.org on 2023/02/17 11:12:09 UTC

[pulsar-dotpulsar] branch master updated: Fix memory leak in Producer (#140)

This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 023ffe2  Fix memory leak in Producer (#140)
023ffe2 is described below

commit 023ffe2b09dda4b2dd06da09ba790f1d0e6424d1
Author: Kristian Andersen <ka...@users.noreply.github.com>
AuthorDate: Fri Feb 17 12:12:04 2023 +0100

    Fix memory leak in Producer (#140)
    
    * Fix memory leak in Producer
    
    Cancellation token registration for Send operation was not disposed
    
    * Found one more
    
    Fixed additional memory leak
    Ensure token registration is disposed even if TaskCompletionSource task is cancelled.
---
 CHANGELOG.md                                   |  6 ++++++
 src/DotPulsar/Internal/AsyncQueueWithCursor.cs | 15 ++++++++++++---
 src/DotPulsar/Internal/Producer.cs             | 11 +++++++++--
 3 files changed, 27 insertions(+), 5 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 3047925..87ff85a 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file.
 
 The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
 
+## [2.10.2] - 2023-02-17
+
+### Fixed
+
+- Fixed a memory leak introduced in 2.8.0 with internal rewrite of producer functionality
+
 ## [2.10.1] - 2023-02-15
 
 ### Fixed
diff --git a/src/DotPulsar/Internal/AsyncQueueWithCursor.cs b/src/DotPulsar/Internal/AsyncQueueWithCursor.cs
index 1765c58..f7f3824 100644
--- a/src/DotPulsar/Internal/AsyncQueueWithCursor.cs
+++ b/src/DotPulsar/Internal/AsyncQueueWithCursor.cs
@@ -221,7 +221,8 @@ public sealed class AsyncQueueWithCursor<T> : IAsyncDisposable where T : IDispos
     /// </summary>
     public async Task WaitForEmpty(CancellationToken cancellationToken)
     {
-        var tcs = new TaskCompletionSource<object>();
+        TaskCompletionSource<object> tcs;
+        CancellationTokenRegistration registration;
         lock (_queue)
         {
             ThrowIfDisposed();
@@ -229,11 +230,19 @@ public sealed class AsyncQueueWithCursor<T> : IAsyncDisposable where T : IDispos
             if (_queue.Count == 0)
                 return;
 
-            cancellationToken.Register(() => tcs.TrySetCanceled());
+            tcs = new TaskCompletionSource<object>();
+            registration = cancellationToken.Register(() => tcs.TrySetCanceled(cancellationToken));
             _queueEmptyTcs.Add(tcs);
         }
 
-        await tcs.Task.ConfigureAwait(false);
+        try
+        {
+            await tcs.Task.ConfigureAwait(false);
+        }
+        finally
+        {
+            registration.Dispose();
+        }
     }
 
     public async ValueTask DisposeAsync()
diff --git a/src/DotPulsar/Internal/Producer.cs b/src/DotPulsar/Internal/Producer.cs
index be92548..f0a8926 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -243,7 +243,7 @@ public sealed class Producer<TMessage> : IProducer<TMessage>, IRegisterEvent
     public async ValueTask<MessageId> Send(MessageMetadata metadata, TMessage message, CancellationToken cancellationToken)
     {
         var tcs = new TaskCompletionSource<MessageId>();
-        cancellationToken.Register(() => tcs.TrySetCanceled(cancellationToken));
+        var registration = cancellationToken.Register(() => tcs.TrySetCanceled(cancellationToken));
 
         ValueTask OnMessageSent(MessageId messageId)
         {
@@ -257,7 +257,14 @@ public sealed class Producer<TMessage> : IProducer<TMessage>, IRegisterEvent
 
         await InternalSend(metadata, message, true, OnMessageSent, cancellationToken).ConfigureAwait(false);
 
-        return await tcs.Task.ConfigureAwait(false);
+        try
+        {
+            return await tcs.Task.ConfigureAwait(false);
+        }
+        finally
+        {
+            registration.Dispose();
+        }
     }
 
     public async ValueTask Enqueue(MessageMetadata metadata, TMessage message, Func<MessageId, ValueTask>? onMessageSent = default, CancellationToken cancellationToken = default)