You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bl...@apache.org on 2020/03/24 11:50:52 UTC
[pulsar-dotpulsar] branch master updated: Have all implement
dispose guard the same way and always retry internal XDisposedExceptions.
This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 52ea9ab Have all implement dispose guard the same way and always retry internal XDisposedExceptions.
52ea9ab is described below
commit 52ea9ab148f10957a639c843d7a202355a374884
Author: Daniel Blankensteiner <db...@vmail.dk>
AuthorDate: Tue Mar 24 12:50:40 2020 +0100
Have all implement dispose guard the same way and always retry internal XDisposedExceptions.
---
DotPulsar.sln | 31 +++++++++++++++-
samples/DotPulsar.Samples.sln | 43 ----------------------
.../DotPulsar.Stress.Tests.csproj | 2 +-
src/DotPulsar.Tests/Internal/AsyncLockTests.cs | 9 +++--
.../Exceptions/ConsumerDisposedException.cs | 10 +++++
.../Exceptions/ProducerDisposedException.cs | 10 +++++
.../Exceptions/PulsarClientDisposedException.cs | 9 +++++
.../Exceptions/ReaderDisposedException.cs | 10 +++++
src/DotPulsar/Internal/AsyncLock.cs | 18 +++++----
src/DotPulsar/Internal/AsyncQueue.cs | 20 +++++-----
src/DotPulsar/Internal/Connection.cs | 33 +++++++++++++++++
src/DotPulsar/Internal/Consumer.cs | 3 +-
src/DotPulsar/Internal/DefaultExceptionHandler.cs | 4 ++
.../Exceptions/AsyncLockDisposedException.cs | 9 +++++
.../Exceptions/AsyncQueueDisposedException.cs | 9 +++++
.../Exceptions/ConnectionDisposedException.cs | 9 +++++
.../Exceptions/PulsarStreamDisposedException.cs | 9 +++++
src/DotPulsar/Internal/Producer.cs | 3 +-
src/DotPulsar/Internal/PulsarStream.cs | 3 +-
src/DotPulsar/Internal/Reader.cs | 3 +-
src/DotPulsar/PulsarClient.cs | 3 +-
21 files changed, 179 insertions(+), 71 deletions(-)
diff --git a/DotPulsar.sln b/DotPulsar.sln
index 97606de..4e4f131 100644
--- a/DotPulsar.sln
+++ b/DotPulsar.sln
@@ -7,7 +7,17 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotPulsar", "src\DotPulsar\
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotPulsar.Tests", "src\DotPulsar.Tests\DotPulsar.Tests.csproj", "{B3FCD2D5-8009-4281-ACDB-6A7BC99606B4}"
EndProject
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DotPulsar.Stress.Tests", "src\DotPulsar.Stress.Tests\DotPulsar.Stress.Tests.csproj", "{DDC68F51-DE1B-4794-9EE7-392C303CF5EB}"
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotPulsar.Stress.Tests", "src\DotPulsar.Stress.Tests\DotPulsar.Stress.Tests.csproj", "{DDC68F51-DE1B-4794-9EE7-392C303CF5EB}"
+EndProject
+Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Tests", "Tests", "{E1C932A9-6D4C-4DDF-8922-BE7B71F12F1C}"
+EndProject
+Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Samples", "Samples", "{E7106D0F-B255-4631-9FB8-734FC5748FA9}"
+EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Consuming", "samples\Consuming\Consuming.csproj", "{2A810EB9-45CE-4593-8E4C-026E0CBB3C42}"
+EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Producing", "samples\Producing\Producing.csproj", "{14934BED-A222-47B2-A58A-CFC4AAB89B49}"
+EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Reading", "samples\Reading\Reading.csproj", "{6D44683B-865C-4D15-9F0A-1A8441354589}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
@@ -27,10 +37,29 @@ Global
{DDC68F51-DE1B-4794-9EE7-392C303CF5EB}.Debug|Any CPU.Build.0 = Debug|Any CPU
{DDC68F51-DE1B-4794-9EE7-392C303CF5EB}.Release|Any CPU.ActiveCfg = Release|Any CPU
{DDC68F51-DE1B-4794-9EE7-392C303CF5EB}.Release|Any CPU.Build.0 = Release|Any CPU
+ {2A810EB9-45CE-4593-8E4C-026E0CBB3C42}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {2A810EB9-45CE-4593-8E4C-026E0CBB3C42}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {2A810EB9-45CE-4593-8E4C-026E0CBB3C42}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {2A810EB9-45CE-4593-8E4C-026E0CBB3C42}.Release|Any CPU.Build.0 = Release|Any CPU
+ {14934BED-A222-47B2-A58A-CFC4AAB89B49}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {14934BED-A222-47B2-A58A-CFC4AAB89B49}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {14934BED-A222-47B2-A58A-CFC4AAB89B49}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {14934BED-A222-47B2-A58A-CFC4AAB89B49}.Release|Any CPU.Build.0 = Release|Any CPU
+ {6D44683B-865C-4D15-9F0A-1A8441354589}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {6D44683B-865C-4D15-9F0A-1A8441354589}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {6D44683B-865C-4D15-9F0A-1A8441354589}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {6D44683B-865C-4D15-9F0A-1A8441354589}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
+ GlobalSection(NestedProjects) = preSolution
+ {B3FCD2D5-8009-4281-ACDB-6A7BC99606B4} = {E1C932A9-6D4C-4DDF-8922-BE7B71F12F1C}
+ {DDC68F51-DE1B-4794-9EE7-392C303CF5EB} = {E1C932A9-6D4C-4DDF-8922-BE7B71F12F1C}
+ {2A810EB9-45CE-4593-8E4C-026E0CBB3C42} = {E7106D0F-B255-4631-9FB8-734FC5748FA9}
+ {14934BED-A222-47B2-A58A-CFC4AAB89B49} = {E7106D0F-B255-4631-9FB8-734FC5748FA9}
+ {6D44683B-865C-4D15-9F0A-1A8441354589} = {E7106D0F-B255-4631-9FB8-734FC5748FA9}
+ EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {88355922-E70A-4B73-B7F8-ABF8F2B59789}
EndGlobalSection
diff --git a/samples/DotPulsar.Samples.sln b/samples/DotPulsar.Samples.sln
deleted file mode 100644
index 6035487..0000000
--- a/samples/DotPulsar.Samples.sln
+++ /dev/null
@@ -1,43 +0,0 @@
-
-Microsoft Visual Studio Solution File, Format Version 12.00
-# Visual Studio Version 16
-VisualStudioVersion = 16.0.29806.167
-MinimumVisualStudioVersion = 10.0.40219.1
-Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Consuming", "Consuming\Consuming.csproj", "{A6EE5186-9893-4915-B439-1A85C2FB92C1}"
-EndProject
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Reading", "Reading\Reading.csproj", "{A2480730-3529-4EA5-9599-374D15371901}"
-EndProject
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Producing", "Producing\Producing.csproj", "{132924A6-B793-4EAE-8574-C8D7E3B38C0D}"
-EndProject
-Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotPulsar", "..\src\DotPulsar\DotPulsar.csproj", "{BA92EB0A-F71C-4CF6-8719-70CCE149EF4D}"
-EndProject
-Global
- GlobalSection(SolutionConfigurationPlatforms) = preSolution
- Debug|Any CPU = Debug|Any CPU
- Release|Any CPU = Release|Any CPU
- EndGlobalSection
- GlobalSection(ProjectConfigurationPlatforms) = postSolution
- {A6EE5186-9893-4915-B439-1A85C2FB92C1}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
- {A6EE5186-9893-4915-B439-1A85C2FB92C1}.Debug|Any CPU.Build.0 = Debug|Any CPU
- {A6EE5186-9893-4915-B439-1A85C2FB92C1}.Release|Any CPU.ActiveCfg = Release|Any CPU
- {A6EE5186-9893-4915-B439-1A85C2FB92C1}.Release|Any CPU.Build.0 = Release|Any CPU
- {A2480730-3529-4EA5-9599-374D15371901}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
- {A2480730-3529-4EA5-9599-374D15371901}.Debug|Any CPU.Build.0 = Debug|Any CPU
- {A2480730-3529-4EA5-9599-374D15371901}.Release|Any CPU.ActiveCfg = Release|Any CPU
- {A2480730-3529-4EA5-9599-374D15371901}.Release|Any CPU.Build.0 = Release|Any CPU
- {132924A6-B793-4EAE-8574-C8D7E3B38C0D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
- {132924A6-B793-4EAE-8574-C8D7E3B38C0D}.Debug|Any CPU.Build.0 = Debug|Any CPU
- {132924A6-B793-4EAE-8574-C8D7E3B38C0D}.Release|Any CPU.ActiveCfg = Release|Any CPU
- {132924A6-B793-4EAE-8574-C8D7E3B38C0D}.Release|Any CPU.Build.0 = Release|Any CPU
- {BA92EB0A-F71C-4CF6-8719-70CCE149EF4D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
- {BA92EB0A-F71C-4CF6-8719-70CCE149EF4D}.Debug|Any CPU.Build.0 = Debug|Any CPU
- {BA92EB0A-F71C-4CF6-8719-70CCE149EF4D}.Release|Any CPU.ActiveCfg = Release|Any CPU
- {BA92EB0A-F71C-4CF6-8719-70CCE149EF4D}.Release|Any CPU.Build.0 = Release|Any CPU
- EndGlobalSection
- GlobalSection(SolutionProperties) = preSolution
- HideSolutionNode = FALSE
- EndGlobalSection
- GlobalSection(ExtensibilityGlobals) = postSolution
- SolutionGuid = {DD71ECF8-282A-4752-AB26-6290B85B709D}
- EndGlobalSection
-EndGlobal
diff --git a/src/DotPulsar.Stress.Tests/DotPulsar.Stress.Tests.csproj b/src/DotPulsar.Stress.Tests/DotPulsar.Stress.Tests.csproj
index d04a824..77ce962 100644
--- a/src/DotPulsar.Stress.Tests/DotPulsar.Stress.Tests.csproj
+++ b/src/DotPulsar.Stress.Tests/DotPulsar.Stress.Tests.csproj
@@ -16,7 +16,7 @@
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
- <PackageReference Include="FluentAssertions" Version="5.10.2" />
+ <PackageReference Include="FluentAssertions" Version="5.10.3" />
</ItemGroup>
<ItemGroup>
diff --git a/src/DotPulsar.Tests/Internal/AsyncLockTests.cs b/src/DotPulsar.Tests/Internal/AsyncLockTests.cs
index 766e9ad..fa151e8 100644
--- a/src/DotPulsar.Tests/Internal/AsyncLockTests.cs
+++ b/src/DotPulsar.Tests/Internal/AsyncLockTests.cs
@@ -13,6 +13,7 @@
*/
using DotPulsar.Internal;
+using DotPulsar.Internal.Exceptions;
using System;
using System.Threading;
using System.Threading.Tasks;
@@ -59,7 +60,7 @@ namespace DotPulsar.Tests.Internal
}
[Fact]
- public async Task Lock_GivenLockIsDisposed_ShouldThrowObjectDisposedException()
+ public async Task Lock_GivenLockIsDisposed_ShouldThrowAsyncLockDisposedException()
{
//Arrange
var sut = new AsyncLock();
@@ -69,11 +70,11 @@ namespace DotPulsar.Tests.Internal
var exception = await Record.ExceptionAsync(() => sut.Lock(CancellationToken.None));
//Assert
- Assert.IsType<ObjectDisposedException>(exception);
+ Assert.IsType<AsyncLockDisposedException>(exception);
}
[Fact]
- public async Task Lock_GivenLockIsDisposedWhileAwaitingLock_ShouldThrowObjectDisposedException()
+ public async Task Lock_GivenLockIsDisposedWhileAwaitingLock_ShouldThrowTaskCanceledException()
{
//Arrange
var sut = new AsyncLock();
@@ -85,7 +86,7 @@ namespace DotPulsar.Tests.Internal
var exception = await Record.ExceptionAsync(() => awaiting);
//Assert
- Assert.IsType<ObjectDisposedException>(exception);
+ Assert.IsType<TaskCanceledException>(exception);
//Annihilate
await sut.DisposeAsync();
diff --git a/src/DotPulsar/Exceptions/ConsumerDisposedException.cs b/src/DotPulsar/Exceptions/ConsumerDisposedException.cs
new file mode 100644
index 0000000..f6079bd
--- /dev/null
+++ b/src/DotPulsar/Exceptions/ConsumerDisposedException.cs
@@ -0,0 +1,10 @@
+using DotPulsar.Internal;
+using System;
+
+namespace DotPulsar.Exceptions
+{
+ public sealed class ConsumerDisposedException : ObjectDisposedException
+ {
+ public ConsumerDisposedException() : base(typeof(Consumer).FullName) { }
+ }
+}
diff --git a/src/DotPulsar/Exceptions/ProducerDisposedException.cs b/src/DotPulsar/Exceptions/ProducerDisposedException.cs
new file mode 100644
index 0000000..c881b45
--- /dev/null
+++ b/src/DotPulsar/Exceptions/ProducerDisposedException.cs
@@ -0,0 +1,10 @@
+using DotPulsar.Internal;
+using System;
+
+namespace DotPulsar.Exceptions
+{
+ public sealed class ProducerDisposedException : ObjectDisposedException
+ {
+ public ProducerDisposedException() : base(typeof(Producer).FullName) { }
+ }
+}
diff --git a/src/DotPulsar/Exceptions/PulsarClientDisposedException.cs b/src/DotPulsar/Exceptions/PulsarClientDisposedException.cs
new file mode 100644
index 0000000..f4a4f00
--- /dev/null
+++ b/src/DotPulsar/Exceptions/PulsarClientDisposedException.cs
@@ -0,0 +1,9 @@
+using System;
+
+namespace DotPulsar.Exceptions
+{
+ public sealed class PulsarClientDisposedException : ObjectDisposedException
+ {
+ public PulsarClientDisposedException() : base(typeof(PulsarClient).FullName) { }
+ }
+}
diff --git a/src/DotPulsar/Exceptions/ReaderDisposedException.cs b/src/DotPulsar/Exceptions/ReaderDisposedException.cs
new file mode 100644
index 0000000..ee02457
--- /dev/null
+++ b/src/DotPulsar/Exceptions/ReaderDisposedException.cs
@@ -0,0 +1,10 @@
+using DotPulsar.Internal;
+using System;
+
+namespace DotPulsar.Exceptions
+{
+ public sealed class ReaderDisposedException : ObjectDisposedException
+ {
+ public ReaderDisposedException() : base(typeof(Reader).FullName) { }
+ }
+}
diff --git a/src/DotPulsar/Internal/AsyncLock.cs b/src/DotPulsar/Internal/AsyncLock.cs
index de04523..592030b 100644
--- a/src/DotPulsar/Internal/AsyncLock.cs
+++ b/src/DotPulsar/Internal/AsyncLock.cs
@@ -12,6 +12,7 @@
* limitations under the License.
*/
+using DotPulsar.Internal.Exceptions;
using System;
using System.Collections.Generic;
using System.Threading;
@@ -25,7 +26,7 @@ namespace DotPulsar.Internal
private readonly SemaphoreSlim _semaphoreSlim;
private readonly Releaser _releaser;
private readonly Task<IDisposable> _completedTask;
- private bool _isDisposed;
+ private int _isDisposed;
public AsyncLock()
{
@@ -33,7 +34,6 @@ namespace DotPulsar.Internal
_semaphoreSlim = new SemaphoreSlim(1, 1);
_releaser = new Releaser(Release);
_completedTask = Task.FromResult((IDisposable)_releaser);
- _isDisposed = false;
}
public Task<IDisposable> Lock(CancellationToken cancellationToken)
@@ -42,8 +42,7 @@ namespace DotPulsar.Internal
lock (_pending)
{
- if (_isDisposed)
- throw new ObjectDisposedException(nameof(AsyncLock));
+ ThrowIfDisposed();
if (_semaphoreSlim.CurrentCount == 1) //Lock is free
{
@@ -65,14 +64,11 @@ namespace DotPulsar.Internal
{
lock (_pending)
{
- if (_isDisposed)
+ if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
return;
- _isDisposed = true;
-
foreach (var pending in _pending)
{
- pending.SetException(new ObjectDisposedException(nameof(AsyncLock)));
pending.Dispose();
}
@@ -118,6 +114,12 @@ namespace DotPulsar.Internal
}
}
+ private void ThrowIfDisposed()
+ {
+ if (_isDisposed != 0)
+ throw new AsyncLockDisposedException();
+ }
+
private class Releaser : IDisposable
{
private readonly Action _release;
diff --git a/src/DotPulsar/Internal/AsyncQueue.cs b/src/DotPulsar/Internal/AsyncQueue.cs
index a6c739d..70519de 100644
--- a/src/DotPulsar/Internal/AsyncQueue.cs
+++ b/src/DotPulsar/Internal/AsyncQueue.cs
@@ -13,6 +13,7 @@
*/
using DotPulsar.Internal.Abstractions;
+using DotPulsar.Internal.Exceptions;
using System;
using System.Collections.Generic;
using System.Threading;
@@ -25,22 +26,20 @@ namespace DotPulsar.Internal
private readonly object _lock;
private readonly Queue<T> _queue;
private readonly LinkedList<CancelableCompletionSource<T>> _pendingDequeues;
- private bool _isDisposed;
+ private int _isDisposed;
public AsyncQueue()
{
_lock = new object();
_queue = new Queue<T>();
_pendingDequeues = new LinkedList<CancelableCompletionSource<T>>();
- _isDisposed = false;
}
public void Enqueue(T item)
{
lock (_lock)
{
- if (_isDisposed)
- throw new ObjectDisposedException(nameof(AsyncQueue<T>));
+ ThrowIfDisposed();
if (_pendingDequeues.Count > 0)
{
@@ -59,8 +58,7 @@ namespace DotPulsar.Internal
lock (_lock)
{
- if (_isDisposed)
- throw new ObjectDisposedException(nameof(AsyncQueue<T>));
+ ThrowIfDisposed();
if (_queue.Count > 0)
return new ValueTask<T>(_queue.Dequeue());
@@ -76,11 +74,9 @@ namespace DotPulsar.Internal
{
lock (_lock)
{
- if (_isDisposed)
+ if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
return;
- _isDisposed = true;
-
foreach (var pendingDequeue in _pendingDequeues)
pendingDequeue.Dispose();
@@ -101,5 +97,11 @@ namespace DotPulsar.Internal
catch { }
}
}
+
+ private void ThrowIfDisposed()
+ {
+ if (_isDisposed != 0)
+ throw new AsyncQueueDisposedException();
+ }
}
}
diff --git a/src/DotPulsar/Internal/Connection.cs b/src/DotPulsar/Internal/Connection.cs
index eb27e11..4e168df 100644
--- a/src/DotPulsar/Internal/Connection.cs
+++ b/src/DotPulsar/Internal/Connection.cs
@@ -13,6 +13,7 @@
*/
using DotPulsar.Internal.Abstractions;
+using DotPulsar.Internal.Exceptions;
using DotPulsar.Internal.Extensions;
using DotPulsar.Internal.PulsarApi;
using System.Threading;
@@ -27,6 +28,7 @@ namespace DotPulsar.Internal
private readonly RequestResponseHandler _requestResponseHandler;
private readonly PingPongHandler _pingPongHandler;
private readonly IPulsarStream _stream;
+ private int _isDisposed;
public Connection(IPulsarStream stream)
{
@@ -39,6 +41,8 @@ namespace DotPulsar.Internal
public async ValueTask<bool> HasChannels(CancellationToken cancellationToken)
{
+ ThrowIfDisposed();
+
using (await _lock.Lock(cancellationToken))
{
return _channelManager.HasChannels();
@@ -47,6 +51,8 @@ namespace DotPulsar.Internal
public async Task<ProducerResponse> Send(CommandProducer command, IChannel channel, CancellationToken cancellationToken)
{
+ ThrowIfDisposed();
+
Task<ProducerResponse>? responseTask = null;
using (await _lock.Lock(cancellationToken))
@@ -63,6 +69,8 @@ namespace DotPulsar.Internal
public async Task<SubscribeResponse> Send(CommandSubscribe command, IChannel channel, CancellationToken cancellationToken)
{
+ ThrowIfDisposed();
+
Task<SubscribeResponse>? responseTask = null;
using (await _lock.Lock(cancellationToken))
@@ -91,6 +99,8 @@ namespace DotPulsar.Internal
public async Task<BaseCommand> Send(CommandUnsubscribe command, CancellationToken cancellationToken)
{
+ ThrowIfDisposed();
+
Task<BaseCommand>? responseTask = null;
using (await _lock.Lock(cancellationToken))
@@ -119,6 +129,8 @@ namespace DotPulsar.Internal
public async Task<BaseCommand> Send(CommandCloseProducer command, CancellationToken cancellationToken)
{
+ ThrowIfDisposed();
+
Task<BaseCommand>? responseTask = null;
using (await _lock.Lock(cancellationToken))
@@ -135,6 +147,8 @@ namespace DotPulsar.Internal
public async Task<BaseCommand> Send(CommandCloseConsumer command, CancellationToken cancellationToken)
{
+ ThrowIfDisposed();
+
Task<BaseCommand>? responseTask = null;
using (await _lock.Lock(cancellationToken))
@@ -151,7 +165,10 @@ namespace DotPulsar.Internal
public async Task<BaseCommand> Send(SendPackage command, CancellationToken cancellationToken)
{
+ ThrowIfDisposed();
+
Task<BaseCommand>? response = null;
+
using (await _lock.Lock(cancellationToken))
{
var baseCommand = command.Command.AsBaseCommand();
@@ -159,23 +176,30 @@ namespace DotPulsar.Internal
var sequence = Serializer.Serialize(baseCommand, command.Metadata, command.Payload);
await _stream.Send(sequence);
}
+
return await response;
}
private async Task<BaseCommand> SendRequestResponse(BaseCommand command, CancellationToken cancellationToken)
{
+ ThrowIfDisposed();
+
Task<BaseCommand>? response = null;
+
using (await _lock.Lock(cancellationToken))
{
response = _requestResponseHandler.Outgoing(command);
var sequence = Serializer.Serialize(command);
await _stream.Send(sequence);
}
+
return await response;
}
private async Task Send(BaseCommand command, CancellationToken cancellationToken)
{
+ ThrowIfDisposed();
+
using (await _lock.Lock(cancellationToken))
{
var sequence = Serializer.Serialize(command);
@@ -225,10 +249,19 @@ namespace DotPulsar.Internal
public async ValueTask DisposeAsync()
{
+ if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
+ return;
+
await _lock.DisposeAsync();
_requestResponseHandler.Dispose();
_channelManager.Dispose();
await _stream.DisposeAsync();
}
+
+ private void ThrowIfDisposed()
+ {
+ if (_isDisposed != 0)
+ throw new ConnectionDisposedException();
+ }
}
}
diff --git a/src/DotPulsar/Internal/Consumer.cs b/src/DotPulsar/Internal/Consumer.cs
index 9b623ad..2f576b7 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -13,6 +13,7 @@
*/
using DotPulsar.Abstractions;
+using DotPulsar.Exceptions;
using DotPulsar.Internal.Abstractions;
using DotPulsar.Internal.Events;
using DotPulsar.Internal.PulsarApi;
@@ -135,7 +136,7 @@ namespace DotPulsar.Internal
private void ThrowIfDisposed()
{
if (_isDisposed != 0)
- throw new ObjectDisposedException(GetType().FullName);
+ throw new ConsumerDisposedException();
}
}
}
diff --git a/src/DotPulsar/Internal/DefaultExceptionHandler.cs b/src/DotPulsar/Internal/DefaultExceptionHandler.cs
index cb0e6b0..bf182b2 100644
--- a/src/DotPulsar/Internal/DefaultExceptionHandler.cs
+++ b/src/DotPulsar/Internal/DefaultExceptionHandler.cs
@@ -44,6 +44,10 @@ namespace DotPulsar.Internal
case TooManyRequestsException _: return FaultAction.Retry;
case ChannelNotReadyException _: return FaultAction.Retry;
case ServiceNotReadyException _: return FaultAction.Retry;
+ case ConnectionDisposedException _: return FaultAction.Retry;
+ case AsyncLockDisposedException _: return FaultAction.Retry;
+ case PulsarStreamDisposedException _: return FaultAction.Retry;
+ case AsyncQueueDisposedException _: return FaultAction.Retry;
case OperationCanceledException _: return cancellationToken.IsCancellationRequested ? FaultAction.Rethrow : FaultAction.Retry;
case DotPulsarException _: return FaultAction.Rethrow;
case SocketException socketException:
diff --git a/src/DotPulsar/Internal/Exceptions/AsyncLockDisposedException.cs b/src/DotPulsar/Internal/Exceptions/AsyncLockDisposedException.cs
new file mode 100644
index 0000000..b3dbbdd
--- /dev/null
+++ b/src/DotPulsar/Internal/Exceptions/AsyncLockDisposedException.cs
@@ -0,0 +1,9 @@
+using System;
+
+namespace DotPulsar.Internal.Exceptions
+{
+ public sealed class AsyncLockDisposedException : ObjectDisposedException
+ {
+ public AsyncLockDisposedException() : base(typeof(AsyncLock).FullName) { }
+ }
+}
diff --git a/src/DotPulsar/Internal/Exceptions/AsyncQueueDisposedException.cs b/src/DotPulsar/Internal/Exceptions/AsyncQueueDisposedException.cs
new file mode 100644
index 0000000..27156cd
--- /dev/null
+++ b/src/DotPulsar/Internal/Exceptions/AsyncQueueDisposedException.cs
@@ -0,0 +1,9 @@
+using System;
+
+namespace DotPulsar.Internal.Exceptions
+{
+ public sealed class AsyncQueueDisposedException : ObjectDisposedException
+ {
+ public AsyncQueueDisposedException() : base(typeof(AsyncQueue<>).FullName) { }
+ }
+}
diff --git a/src/DotPulsar/Internal/Exceptions/ConnectionDisposedException.cs b/src/DotPulsar/Internal/Exceptions/ConnectionDisposedException.cs
new file mode 100644
index 0000000..e445908
--- /dev/null
+++ b/src/DotPulsar/Internal/Exceptions/ConnectionDisposedException.cs
@@ -0,0 +1,9 @@
+using System;
+
+namespace DotPulsar.Internal.Exceptions
+{
+ public sealed class ConnectionDisposedException : ObjectDisposedException
+ {
+ public ConnectionDisposedException() : base(typeof(Connection).FullName) { }
+ }
+}
diff --git a/src/DotPulsar/Internal/Exceptions/PulsarStreamDisposedException.cs b/src/DotPulsar/Internal/Exceptions/PulsarStreamDisposedException.cs
new file mode 100644
index 0000000..8bd4567
--- /dev/null
+++ b/src/DotPulsar/Internal/Exceptions/PulsarStreamDisposedException.cs
@@ -0,0 +1,9 @@
+using System;
+
+namespace DotPulsar.Internal.Exceptions
+{
+ public sealed class PulsarStreamDisposedException : ObjectDisposedException
+ {
+ public PulsarStreamDisposedException() : base(typeof(PulsarStream).FullName) { }
+ }
+}
diff --git a/src/DotPulsar/Internal/Producer.cs b/src/DotPulsar/Internal/Producer.cs
index 2d850cf..9362a57 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -13,6 +13,7 @@
*/
using DotPulsar.Abstractions;
+using DotPulsar.Exceptions;
using DotPulsar.Internal.Abstractions;
using DotPulsar.Internal.Events;
using System;
@@ -102,7 +103,7 @@ namespace DotPulsar.Internal
private void ThrowIfDisposed()
{
if (_isDisposed != 0)
- throw new ObjectDisposedException(nameof(Producer));
+ throw new ProducerDisposedException();
}
}
}
diff --git a/src/DotPulsar/Internal/PulsarStream.cs b/src/DotPulsar/Internal/PulsarStream.cs
index 4fafbf2..99eef2f 100644
--- a/src/DotPulsar/Internal/PulsarStream.cs
+++ b/src/DotPulsar/Internal/PulsarStream.cs
@@ -13,6 +13,7 @@
*/
using DotPulsar.Internal.Abstractions;
+using DotPulsar.Internal.Exceptions;
using DotPulsar.Internal.Extensions;
using System;
using System.Buffers;
@@ -154,7 +155,7 @@ namespace DotPulsar.Internal
private void ThrowIfDisposed()
{
if (_isDisposed != 0)
- throw new ObjectDisposedException(nameof(PulsarStream));
+ throw new PulsarStreamDisposedException();
}
}
}
diff --git a/src/DotPulsar/Internal/Reader.cs b/src/DotPulsar/Internal/Reader.cs
index f444e3b..4804d67 100644
--- a/src/DotPulsar/Internal/Reader.cs
+++ b/src/DotPulsar/Internal/Reader.cs
@@ -13,6 +13,7 @@
*/
using DotPulsar.Abstractions;
+using DotPulsar.Exceptions;
using DotPulsar.Internal.Abstractions;
using DotPulsar.Internal.Events;
using System;
@@ -87,7 +88,7 @@ namespace DotPulsar.Internal
private void ThrowIfDisposed()
{
if (_isDisposed != 0)
- throw new ObjectDisposedException(nameof(Reader));
+ throw new ReaderDisposedException();
}
}
}
diff --git a/src/DotPulsar/PulsarClient.cs b/src/DotPulsar/PulsarClient.cs
index f368ce0..21e4aaf 100644
--- a/src/DotPulsar/PulsarClient.cs
+++ b/src/DotPulsar/PulsarClient.cs
@@ -13,6 +13,7 @@
*/
using DotPulsar.Abstractions;
+using DotPulsar.Exceptions;
using DotPulsar.Internal;
using DotPulsar.Internal.Abstractions;
using System;
@@ -95,7 +96,7 @@ namespace DotPulsar
private void ThrowIfDisposed()
{
if (_isDisposed != 0)
- throw new ObjectDisposedException(nameof(PulsarClient));
+ throw new PulsarClientDisposedException();
}
}
}