You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bl...@apache.org on 2023/02/17 11:12:09 UTC
[pulsar-dotpulsar] branch master updated: Fix memory leak in Producer (#140)
This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 023ffe2 Fix memory leak in Producer (#140)
023ffe2 is described below
commit 023ffe2b09dda4b2dd06da09ba790f1d0e6424d1
Author: Kristian Andersen <ka...@users.noreply.github.com>
AuthorDate: Fri Feb 17 12:12:04 2023 +0100
Fix memory leak in Producer (#140)
* Fix memory leak in Producer
Cancellation token registration for Send operation was not disposed
* Found one more
Fixed additional memory leak
Ensure token registration is disposed even if TaskCompletionSource task is cancelled.
---
CHANGELOG.md | 6 ++++++
src/DotPulsar/Internal/AsyncQueueWithCursor.cs | 15 ++++++++++++---
src/DotPulsar/Internal/Producer.cs | 11 +++++++++--
3 files changed, 27 insertions(+), 5 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 3047925..87ff85a 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
+## [2.10.2] - 2023-02-17
+
+### Fixed
+
+- Fixed a memory leak introduced in 2.8.0 with internal rewrite of producer functionality
+
## [2.10.1] - 2023-02-15
### Fixed
diff --git a/src/DotPulsar/Internal/AsyncQueueWithCursor.cs b/src/DotPulsar/Internal/AsyncQueueWithCursor.cs
index 1765c58..f7f3824 100644
--- a/src/DotPulsar/Internal/AsyncQueueWithCursor.cs
+++ b/src/DotPulsar/Internal/AsyncQueueWithCursor.cs
@@ -221,7 +221,8 @@ public sealed class AsyncQueueWithCursor<T> : IAsyncDisposable where T : IDispos
/// </summary>
public async Task WaitForEmpty(CancellationToken cancellationToken)
{
- var tcs = new TaskCompletionSource<object>();
+ TaskCompletionSource<object> tcs;
+ CancellationTokenRegistration registration;
lock (_queue)
{
ThrowIfDisposed();
@@ -229,11 +230,19 @@ public sealed class AsyncQueueWithCursor<T> : IAsyncDisposable where T : IDispos
if (_queue.Count == 0)
return;
- cancellationToken.Register(() => tcs.TrySetCanceled());
+ tcs = new TaskCompletionSource<object>();
+ registration = cancellationToken.Register(() => tcs.TrySetCanceled(cancellationToken));
_queueEmptyTcs.Add(tcs);
}
- await tcs.Task.ConfigureAwait(false);
+ try
+ {
+ await tcs.Task.ConfigureAwait(false);
+ }
+ finally
+ {
+ registration.Dispose();
+ }
}
public async ValueTask DisposeAsync()
diff --git a/src/DotPulsar/Internal/Producer.cs b/src/DotPulsar/Internal/Producer.cs
index be92548..f0a8926 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -243,7 +243,7 @@ public sealed class Producer<TMessage> : IProducer<TMessage>, IRegisterEvent
public async ValueTask<MessageId> Send(MessageMetadata metadata, TMessage message, CancellationToken cancellationToken)
{
var tcs = new TaskCompletionSource<MessageId>();
- cancellationToken.Register(() => tcs.TrySetCanceled(cancellationToken));
+ var registration = cancellationToken.Register(() => tcs.TrySetCanceled(cancellationToken));
ValueTask OnMessageSent(MessageId messageId)
{
@@ -257,7 +257,14 @@ public sealed class Producer<TMessage> : IProducer<TMessage>, IRegisterEvent
await InternalSend(metadata, message, true, OnMessageSent, cancellationToken).ConfigureAwait(false);
- return await tcs.Task.ConfigureAwait(false);
+ try
+ {
+ return await tcs.Task.ConfigureAwait(false);
+ }
+ finally
+ {
+ registration.Dispose();
+ }
}
public async ValueTask Enqueue(MessageMetadata metadata, TMessage message, Func<MessageId, ValueTask>? onMessageSent = default, CancellationToken cancellationToken = default)