You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bl...@apache.org on 2020/03/24 11:50:52 UTC

[pulsar-dotpulsar] branch master updated: Have all implement dispose guard the same way and always retry internal XDisposedExceptions.

This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 52ea9ab  Have all implement dispose guard the same way and always retry internal XDisposedExceptions.
52ea9ab is described below

commit 52ea9ab148f10957a639c843d7a202355a374884
Author: Daniel Blankensteiner <db...@vmail.dk>
AuthorDate: Tue Mar 24 12:50:40 2020 +0100

    Have all implement dispose guard the same way and always retry internal XDisposedExceptions.
---
 DotPulsar.sln                                      | 31 +++++++++++++++-
 samples/DotPulsar.Samples.sln                      | 43 ----------------------
 .../DotPulsar.Stress.Tests.csproj                  |  2 +-
 src/DotPulsar.Tests/Internal/AsyncLockTests.cs     |  9 +++--
 .../Exceptions/ConsumerDisposedException.cs        | 10 +++++
 .../Exceptions/ProducerDisposedException.cs        | 10 +++++
 .../Exceptions/PulsarClientDisposedException.cs    |  9 +++++
 .../Exceptions/ReaderDisposedException.cs          | 10 +++++
 src/DotPulsar/Internal/AsyncLock.cs                | 18 +++++----
 src/DotPulsar/Internal/AsyncQueue.cs               | 20 +++++-----
 src/DotPulsar/Internal/Connection.cs               | 33 +++++++++++++++++
 src/DotPulsar/Internal/Consumer.cs                 |  3 +-
 src/DotPulsar/Internal/DefaultExceptionHandler.cs  |  4 ++
 .../Exceptions/AsyncLockDisposedException.cs       |  9 +++++
 .../Exceptions/AsyncQueueDisposedException.cs      |  9 +++++
 .../Exceptions/ConnectionDisposedException.cs      |  9 +++++
 .../Exceptions/PulsarStreamDisposedException.cs    |  9 +++++
 src/DotPulsar/Internal/Producer.cs                 |  3 +-
 src/DotPulsar/Internal/PulsarStream.cs             |  3 +-
 src/DotPulsar/Internal/Reader.cs                   |  3 +-
 src/DotPulsar/PulsarClient.cs                      |  3 +-
 21 files changed, 179 insertions(+), 71 deletions(-)

diff --git a/DotPulsar.sln b/DotPulsar.sln
index 97606de..4e4f131 100644
--- a/DotPulsar.sln
+++ b/DotPulsar.sln
@@ -7,7 +7,17 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotPulsar", "src\DotPulsar\
 EndProject
 Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotPulsar.Tests", "src\DotPulsar.Tests\DotPulsar.Tests.csproj", "{B3FCD2D5-8009-4281-ACDB-6A7BC99606B4}"
 EndProject
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DotPulsar.Stress.Tests", "src\DotPulsar.Stress.Tests\DotPulsar.Stress.Tests.csproj", "{DDC68F51-DE1B-4794-9EE7-392C303CF5EB}"
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotPulsar.Stress.Tests", "src\DotPulsar.Stress.Tests\DotPulsar.Stress.Tests.csproj", "{DDC68F51-DE1B-4794-9EE7-392C303CF5EB}"
+EndProject
+Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Tests", "Tests", "{E1C932A9-6D4C-4DDF-8922-BE7B71F12F1C}"
+EndProject
+Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Samples", "Samples", "{E7106D0F-B255-4631-9FB8-734FC5748FA9}"
+EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Consuming", "samples\Consuming\Consuming.csproj", "{2A810EB9-45CE-4593-8E4C-026E0CBB3C42}"
+EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Producing", "samples\Producing\Producing.csproj", "{14934BED-A222-47B2-A58A-CFC4AAB89B49}"
+EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Reading", "samples\Reading\Reading.csproj", "{6D44683B-865C-4D15-9F0A-1A8441354589}"
 EndProject
 Global
 	GlobalSection(SolutionConfigurationPlatforms) = preSolution
@@ -27,10 +37,29 @@ Global
 		{DDC68F51-DE1B-4794-9EE7-392C303CF5EB}.Debug|Any CPU.Build.0 = Debug|Any CPU
 		{DDC68F51-DE1B-4794-9EE7-392C303CF5EB}.Release|Any CPU.ActiveCfg = Release|Any CPU
 		{DDC68F51-DE1B-4794-9EE7-392C303CF5EB}.Release|Any CPU.Build.0 = Release|Any CPU
+		{2A810EB9-45CE-4593-8E4C-026E0CBB3C42}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+		{2A810EB9-45CE-4593-8E4C-026E0CBB3C42}.Debug|Any CPU.Build.0 = Debug|Any CPU
+		{2A810EB9-45CE-4593-8E4C-026E0CBB3C42}.Release|Any CPU.ActiveCfg = Release|Any CPU
+		{2A810EB9-45CE-4593-8E4C-026E0CBB3C42}.Release|Any CPU.Build.0 = Release|Any CPU
+		{14934BED-A222-47B2-A58A-CFC4AAB89B49}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+		{14934BED-A222-47B2-A58A-CFC4AAB89B49}.Debug|Any CPU.Build.0 = Debug|Any CPU
+		{14934BED-A222-47B2-A58A-CFC4AAB89B49}.Release|Any CPU.ActiveCfg = Release|Any CPU
+		{14934BED-A222-47B2-A58A-CFC4AAB89B49}.Release|Any CPU.Build.0 = Release|Any CPU
+		{6D44683B-865C-4D15-9F0A-1A8441354589}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+		{6D44683B-865C-4D15-9F0A-1A8441354589}.Debug|Any CPU.Build.0 = Debug|Any CPU
+		{6D44683B-865C-4D15-9F0A-1A8441354589}.Release|Any CPU.ActiveCfg = Release|Any CPU
+		{6D44683B-865C-4D15-9F0A-1A8441354589}.Release|Any CPU.Build.0 = Release|Any CPU
 	EndGlobalSection
 	GlobalSection(SolutionProperties) = preSolution
 		HideSolutionNode = FALSE
 	EndGlobalSection
+	GlobalSection(NestedProjects) = preSolution
+		{B3FCD2D5-8009-4281-ACDB-6A7BC99606B4} = {E1C932A9-6D4C-4DDF-8922-BE7B71F12F1C}
+		{DDC68F51-DE1B-4794-9EE7-392C303CF5EB} = {E1C932A9-6D4C-4DDF-8922-BE7B71F12F1C}
+		{2A810EB9-45CE-4593-8E4C-026E0CBB3C42} = {E7106D0F-B255-4631-9FB8-734FC5748FA9}
+		{14934BED-A222-47B2-A58A-CFC4AAB89B49} = {E7106D0F-B255-4631-9FB8-734FC5748FA9}
+		{6D44683B-865C-4D15-9F0A-1A8441354589} = {E7106D0F-B255-4631-9FB8-734FC5748FA9}
+	EndGlobalSection
 	GlobalSection(ExtensibilityGlobals) = postSolution
 		SolutionGuid = {88355922-E70A-4B73-B7F8-ABF8F2B59789}
 	EndGlobalSection
diff --git a/samples/DotPulsar.Samples.sln b/samples/DotPulsar.Samples.sln
deleted file mode 100644
index 6035487..0000000
--- a/samples/DotPulsar.Samples.sln
+++ /dev/null
@@ -1,43 +0,0 @@
-
-Microsoft Visual Studio Solution File, Format Version 12.00
-# Visual Studio Version 16
-VisualStudioVersion = 16.0.29806.167
-MinimumVisualStudioVersion = 10.0.40219.1
-Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Consuming", "Consuming\Consuming.csproj", "{A6EE5186-9893-4915-B439-1A85C2FB92C1}"
-EndProject
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Reading", "Reading\Reading.csproj", "{A2480730-3529-4EA5-9599-374D15371901}"
-EndProject
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Producing", "Producing\Producing.csproj", "{132924A6-B793-4EAE-8574-C8D7E3B38C0D}"
-EndProject
-Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotPulsar", "..\src\DotPulsar\DotPulsar.csproj", "{BA92EB0A-F71C-4CF6-8719-70CCE149EF4D}"
-EndProject
-Global
-	GlobalSection(SolutionConfigurationPlatforms) = preSolution
-		Debug|Any CPU = Debug|Any CPU
-		Release|Any CPU = Release|Any CPU
-	EndGlobalSection
-	GlobalSection(ProjectConfigurationPlatforms) = postSolution
-		{A6EE5186-9893-4915-B439-1A85C2FB92C1}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
-		{A6EE5186-9893-4915-B439-1A85C2FB92C1}.Debug|Any CPU.Build.0 = Debug|Any CPU
-		{A6EE5186-9893-4915-B439-1A85C2FB92C1}.Release|Any CPU.ActiveCfg = Release|Any CPU
-		{A6EE5186-9893-4915-B439-1A85C2FB92C1}.Release|Any CPU.Build.0 = Release|Any CPU
-		{A2480730-3529-4EA5-9599-374D15371901}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
-		{A2480730-3529-4EA5-9599-374D15371901}.Debug|Any CPU.Build.0 = Debug|Any CPU
-		{A2480730-3529-4EA5-9599-374D15371901}.Release|Any CPU.ActiveCfg = Release|Any CPU
-		{A2480730-3529-4EA5-9599-374D15371901}.Release|Any CPU.Build.0 = Release|Any CPU
-		{132924A6-B793-4EAE-8574-C8D7E3B38C0D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
-		{132924A6-B793-4EAE-8574-C8D7E3B38C0D}.Debug|Any CPU.Build.0 = Debug|Any CPU
-		{132924A6-B793-4EAE-8574-C8D7E3B38C0D}.Release|Any CPU.ActiveCfg = Release|Any CPU
-		{132924A6-B793-4EAE-8574-C8D7E3B38C0D}.Release|Any CPU.Build.0 = Release|Any CPU
-		{BA92EB0A-F71C-4CF6-8719-70CCE149EF4D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
-		{BA92EB0A-F71C-4CF6-8719-70CCE149EF4D}.Debug|Any CPU.Build.0 = Debug|Any CPU
-		{BA92EB0A-F71C-4CF6-8719-70CCE149EF4D}.Release|Any CPU.ActiveCfg = Release|Any CPU
-		{BA92EB0A-F71C-4CF6-8719-70CCE149EF4D}.Release|Any CPU.Build.0 = Release|Any CPU
-	EndGlobalSection
-	GlobalSection(SolutionProperties) = preSolution
-		HideSolutionNode = FALSE
-	EndGlobalSection
-	GlobalSection(ExtensibilityGlobals) = postSolution
-		SolutionGuid = {DD71ECF8-282A-4752-AB26-6290B85B709D}
-	EndGlobalSection
-EndGlobal
diff --git a/src/DotPulsar.Stress.Tests/DotPulsar.Stress.Tests.csproj b/src/DotPulsar.Stress.Tests/DotPulsar.Stress.Tests.csproj
index d04a824..77ce962 100644
--- a/src/DotPulsar.Stress.Tests/DotPulsar.Stress.Tests.csproj
+++ b/src/DotPulsar.Stress.Tests/DotPulsar.Stress.Tests.csproj
@@ -16,7 +16,7 @@
       <PrivateAssets>all</PrivateAssets>
       <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
     </PackageReference>
-    <PackageReference Include="FluentAssertions" Version="5.10.2" />
+    <PackageReference Include="FluentAssertions" Version="5.10.3" />
   </ItemGroup>
 
   <ItemGroup>
diff --git a/src/DotPulsar.Tests/Internal/AsyncLockTests.cs b/src/DotPulsar.Tests/Internal/AsyncLockTests.cs
index 766e9ad..fa151e8 100644
--- a/src/DotPulsar.Tests/Internal/AsyncLockTests.cs
+++ b/src/DotPulsar.Tests/Internal/AsyncLockTests.cs
@@ -13,6 +13,7 @@
  */
 
 using DotPulsar.Internal;
+using DotPulsar.Internal.Exceptions;
 using System;
 using System.Threading;
 using System.Threading.Tasks;
@@ -59,7 +60,7 @@ namespace DotPulsar.Tests.Internal
         }
 
         [Fact]
-        public async Task Lock_GivenLockIsDisposed_ShouldThrowObjectDisposedException()
+        public async Task Lock_GivenLockIsDisposed_ShouldThrowAsyncLockDisposedException()
         {
             //Arrange
             var sut = new AsyncLock();
@@ -69,11 +70,11 @@ namespace DotPulsar.Tests.Internal
             var exception = await Record.ExceptionAsync(() => sut.Lock(CancellationToken.None));
 
             //Assert
-            Assert.IsType<ObjectDisposedException>(exception);
+            Assert.IsType<AsyncLockDisposedException>(exception);
         }
 
         [Fact]
-        public async Task Lock_GivenLockIsDisposedWhileAwaitingLock_ShouldThrowObjectDisposedException()
+        public async Task Lock_GivenLockIsDisposedWhileAwaitingLock_ShouldThrowTaskCanceledException()
         {
             //Arrange
             var sut = new AsyncLock();
@@ -85,7 +86,7 @@ namespace DotPulsar.Tests.Internal
             var exception = await Record.ExceptionAsync(() => awaiting);
 
             //Assert
-            Assert.IsType<ObjectDisposedException>(exception);
+            Assert.IsType<TaskCanceledException>(exception);
 
             //Annihilate
             await sut.DisposeAsync();
diff --git a/src/DotPulsar/Exceptions/ConsumerDisposedException.cs b/src/DotPulsar/Exceptions/ConsumerDisposedException.cs
new file mode 100644
index 0000000..f6079bd
--- /dev/null
+++ b/src/DotPulsar/Exceptions/ConsumerDisposedException.cs
@@ -0,0 +1,10 @@
+using DotPulsar.Internal;
+using System;
+
+namespace DotPulsar.Exceptions
+{
+    public sealed class ConsumerDisposedException : ObjectDisposedException
+    {
+        public ConsumerDisposedException() : base(typeof(Consumer).FullName) { }
+    }
+}
diff --git a/src/DotPulsar/Exceptions/ProducerDisposedException.cs b/src/DotPulsar/Exceptions/ProducerDisposedException.cs
new file mode 100644
index 0000000..c881b45
--- /dev/null
+++ b/src/DotPulsar/Exceptions/ProducerDisposedException.cs
@@ -0,0 +1,10 @@
+using DotPulsar.Internal;
+using System;
+
+namespace DotPulsar.Exceptions
+{
+    public sealed class ProducerDisposedException : ObjectDisposedException
+    {
+        public ProducerDisposedException() : base(typeof(Producer).FullName) { }
+    }
+}
diff --git a/src/DotPulsar/Exceptions/PulsarClientDisposedException.cs b/src/DotPulsar/Exceptions/PulsarClientDisposedException.cs
new file mode 100644
index 0000000..f4a4f00
--- /dev/null
+++ b/src/DotPulsar/Exceptions/PulsarClientDisposedException.cs
@@ -0,0 +1,9 @@
+using System;
+
+namespace DotPulsar.Exceptions
+{
+    public sealed class PulsarClientDisposedException : ObjectDisposedException
+    {
+        public PulsarClientDisposedException() : base(typeof(PulsarClient).FullName) { }
+    }
+}
diff --git a/src/DotPulsar/Exceptions/ReaderDisposedException.cs b/src/DotPulsar/Exceptions/ReaderDisposedException.cs
new file mode 100644
index 0000000..ee02457
--- /dev/null
+++ b/src/DotPulsar/Exceptions/ReaderDisposedException.cs
@@ -0,0 +1,10 @@
+using DotPulsar.Internal;
+using System;
+
+namespace DotPulsar.Exceptions
+{
+    public sealed class ReaderDisposedException : ObjectDisposedException
+    {
+        public ReaderDisposedException() : base(typeof(Reader).FullName) { }
+    }
+}
diff --git a/src/DotPulsar/Internal/AsyncLock.cs b/src/DotPulsar/Internal/AsyncLock.cs
index de04523..592030b 100644
--- a/src/DotPulsar/Internal/AsyncLock.cs
+++ b/src/DotPulsar/Internal/AsyncLock.cs
@@ -12,6 +12,7 @@
  * limitations under the License.
  */
 
+using DotPulsar.Internal.Exceptions;
 using System;
 using System.Collections.Generic;
 using System.Threading;
@@ -25,7 +26,7 @@ namespace DotPulsar.Internal
         private readonly SemaphoreSlim _semaphoreSlim;
         private readonly Releaser _releaser;
         private readonly Task<IDisposable> _completedTask;
-        private bool _isDisposed;
+        private int _isDisposed;
 
         public AsyncLock()
         {
@@ -33,7 +34,6 @@ namespace DotPulsar.Internal
             _semaphoreSlim = new SemaphoreSlim(1, 1);
             _releaser = new Releaser(Release);
             _completedTask = Task.FromResult((IDisposable)_releaser);
-            _isDisposed = false;
         }
 
         public Task<IDisposable> Lock(CancellationToken cancellationToken)
@@ -42,8 +42,7 @@ namespace DotPulsar.Internal
 
             lock (_pending)
             {
-                if (_isDisposed)
-                    throw new ObjectDisposedException(nameof(AsyncLock));
+                ThrowIfDisposed();
 
                 if (_semaphoreSlim.CurrentCount == 1) //Lock is free
                 {
@@ -65,14 +64,11 @@ namespace DotPulsar.Internal
         {
             lock (_pending)
             {
-                if (_isDisposed)
+                if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
                     return;
 
-                _isDisposed = true;
-
                 foreach (var pending in _pending)
                 {
-                    pending.SetException(new ObjectDisposedException(nameof(AsyncLock)));
                     pending.Dispose();
                 }
 
@@ -118,6 +114,12 @@ namespace DotPulsar.Internal
             }
         }
 
+        private void ThrowIfDisposed()
+        {
+            if (_isDisposed != 0)
+                throw new AsyncLockDisposedException();
+        }
+
         private class Releaser : IDisposable
         {
             private readonly Action _release;
diff --git a/src/DotPulsar/Internal/AsyncQueue.cs b/src/DotPulsar/Internal/AsyncQueue.cs
index a6c739d..70519de 100644
--- a/src/DotPulsar/Internal/AsyncQueue.cs
+++ b/src/DotPulsar/Internal/AsyncQueue.cs
@@ -13,6 +13,7 @@
  */
 
 using DotPulsar.Internal.Abstractions;
+using DotPulsar.Internal.Exceptions;
 using System;
 using System.Collections.Generic;
 using System.Threading;
@@ -25,22 +26,20 @@ namespace DotPulsar.Internal
         private readonly object _lock;
         private readonly Queue<T> _queue;
         private readonly LinkedList<CancelableCompletionSource<T>> _pendingDequeues;
-        private bool _isDisposed;
+        private int _isDisposed;
 
         public AsyncQueue()
         {
             _lock = new object();
             _queue = new Queue<T>();
             _pendingDequeues = new LinkedList<CancelableCompletionSource<T>>();
-            _isDisposed = false;
         }
 
         public void Enqueue(T item)
         {
             lock (_lock)
             {
-                if (_isDisposed)
-                    throw new ObjectDisposedException(nameof(AsyncQueue<T>));
+                ThrowIfDisposed();
 
                 if (_pendingDequeues.Count > 0)
                 {
@@ -59,8 +58,7 @@ namespace DotPulsar.Internal
 
             lock (_lock)
             {
-                if (_isDisposed)
-                    throw new ObjectDisposedException(nameof(AsyncQueue<T>));
+                ThrowIfDisposed();
 
                 if (_queue.Count > 0)
                     return new ValueTask<T>(_queue.Dequeue());
@@ -76,11 +74,9 @@ namespace DotPulsar.Internal
         {
             lock (_lock)
             {
-                if (_isDisposed)
+                if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
                     return;
 
-                _isDisposed = true;
-
                 foreach (var pendingDequeue in _pendingDequeues)
                     pendingDequeue.Dispose();
 
@@ -101,5 +97,11 @@ namespace DotPulsar.Internal
                 catch { }
             }
         }
+
+        private void ThrowIfDisposed()
+        {
+            if (_isDisposed != 0)
+                throw new AsyncQueueDisposedException();
+        }
     }
 }
diff --git a/src/DotPulsar/Internal/Connection.cs b/src/DotPulsar/Internal/Connection.cs
index eb27e11..4e168df 100644
--- a/src/DotPulsar/Internal/Connection.cs
+++ b/src/DotPulsar/Internal/Connection.cs
@@ -13,6 +13,7 @@
  */
 
 using DotPulsar.Internal.Abstractions;
+using DotPulsar.Internal.Exceptions;
 using DotPulsar.Internal.Extensions;
 using DotPulsar.Internal.PulsarApi;
 using System.Threading;
@@ -27,6 +28,7 @@ namespace DotPulsar.Internal
         private readonly RequestResponseHandler _requestResponseHandler;
         private readonly PingPongHandler _pingPongHandler;
         private readonly IPulsarStream _stream;
+        private int _isDisposed;
 
         public Connection(IPulsarStream stream)
         {
@@ -39,6 +41,8 @@ namespace DotPulsar.Internal
 
         public async ValueTask<bool> HasChannels(CancellationToken cancellationToken)
         {
+            ThrowIfDisposed();
+
             using (await _lock.Lock(cancellationToken))
             {
                 return _channelManager.HasChannels();
@@ -47,6 +51,8 @@ namespace DotPulsar.Internal
 
         public async Task<ProducerResponse> Send(CommandProducer command, IChannel channel, CancellationToken cancellationToken)
         {
+            ThrowIfDisposed();
+
             Task<ProducerResponse>? responseTask = null;
 
             using (await _lock.Lock(cancellationToken))
@@ -63,6 +69,8 @@ namespace DotPulsar.Internal
 
         public async Task<SubscribeResponse> Send(CommandSubscribe command, IChannel channel, CancellationToken cancellationToken)
         {
+            ThrowIfDisposed();
+
             Task<SubscribeResponse>? responseTask = null;
 
             using (await _lock.Lock(cancellationToken))
@@ -91,6 +99,8 @@ namespace DotPulsar.Internal
 
         public async Task<BaseCommand> Send(CommandUnsubscribe command, CancellationToken cancellationToken)
         {
+            ThrowIfDisposed();
+
             Task<BaseCommand>? responseTask = null;
 
             using (await _lock.Lock(cancellationToken))
@@ -119,6 +129,8 @@ namespace DotPulsar.Internal
 
         public async Task<BaseCommand> Send(CommandCloseProducer command, CancellationToken cancellationToken)
         {
+            ThrowIfDisposed();
+
             Task<BaseCommand>? responseTask = null;
 
             using (await _lock.Lock(cancellationToken))
@@ -135,6 +147,8 @@ namespace DotPulsar.Internal
 
         public async Task<BaseCommand> Send(CommandCloseConsumer command, CancellationToken cancellationToken)
         {
+            ThrowIfDisposed();
+
             Task<BaseCommand>? responseTask = null;
 
             using (await _lock.Lock(cancellationToken))
@@ -151,7 +165,10 @@ namespace DotPulsar.Internal
 
         public async Task<BaseCommand> Send(SendPackage command, CancellationToken cancellationToken)
         {
+            ThrowIfDisposed();
+
             Task<BaseCommand>? response = null;
+
             using (await _lock.Lock(cancellationToken))
             {
                 var baseCommand = command.Command.AsBaseCommand();
@@ -159,23 +176,30 @@ namespace DotPulsar.Internal
                 var sequence = Serializer.Serialize(baseCommand, command.Metadata, command.Payload);
                 await _stream.Send(sequence);
             }
+
             return await response;
         }
 
         private async Task<BaseCommand> SendRequestResponse(BaseCommand command, CancellationToken cancellationToken)
         {
+            ThrowIfDisposed();
+
             Task<BaseCommand>? response = null;
+
             using (await _lock.Lock(cancellationToken))
             {
                 response = _requestResponseHandler.Outgoing(command);
                 var sequence = Serializer.Serialize(command);
                 await _stream.Send(sequence);
             }
+
             return await response;
         }
 
         private async Task Send(BaseCommand command, CancellationToken cancellationToken)
         {
+            ThrowIfDisposed();
+
             using (await _lock.Lock(cancellationToken))
             {
                 var sequence = Serializer.Serialize(command);
@@ -225,10 +249,19 @@ namespace DotPulsar.Internal
 
         public async ValueTask DisposeAsync()
         {
+            if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
+                return;
+
             await _lock.DisposeAsync();
             _requestResponseHandler.Dispose();
             _channelManager.Dispose();
             await _stream.DisposeAsync();
         }
+
+        private void ThrowIfDisposed()
+        {
+            if (_isDisposed != 0)
+                throw new ConnectionDisposedException();
+        }
     }
 }
diff --git a/src/DotPulsar/Internal/Consumer.cs b/src/DotPulsar/Internal/Consumer.cs
index 9b623ad..2f576b7 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -13,6 +13,7 @@
  */
 
 using DotPulsar.Abstractions;
+using DotPulsar.Exceptions;
 using DotPulsar.Internal.Abstractions;
 using DotPulsar.Internal.Events;
 using DotPulsar.Internal.PulsarApi;
@@ -135,7 +136,7 @@ namespace DotPulsar.Internal
         private void ThrowIfDisposed()
         {
             if (_isDisposed != 0)
-                throw new ObjectDisposedException(GetType().FullName);
+                throw new ConsumerDisposedException();
         }
     }
 }
diff --git a/src/DotPulsar/Internal/DefaultExceptionHandler.cs b/src/DotPulsar/Internal/DefaultExceptionHandler.cs
index cb0e6b0..bf182b2 100644
--- a/src/DotPulsar/Internal/DefaultExceptionHandler.cs
+++ b/src/DotPulsar/Internal/DefaultExceptionHandler.cs
@@ -44,6 +44,10 @@ namespace DotPulsar.Internal
                 case TooManyRequestsException _: return FaultAction.Retry;
                 case ChannelNotReadyException _: return FaultAction.Retry;
                 case ServiceNotReadyException _: return FaultAction.Retry;
+                case ConnectionDisposedException _: return FaultAction.Retry;
+                case AsyncLockDisposedException _: return FaultAction.Retry;
+                case PulsarStreamDisposedException _: return FaultAction.Retry;
+                case AsyncQueueDisposedException _: return FaultAction.Retry;
                 case OperationCanceledException _: return cancellationToken.IsCancellationRequested ? FaultAction.Rethrow : FaultAction.Retry;
                 case DotPulsarException _: return FaultAction.Rethrow;
                 case SocketException socketException:
diff --git a/src/DotPulsar/Internal/Exceptions/AsyncLockDisposedException.cs b/src/DotPulsar/Internal/Exceptions/AsyncLockDisposedException.cs
new file mode 100644
index 0000000..b3dbbdd
--- /dev/null
+++ b/src/DotPulsar/Internal/Exceptions/AsyncLockDisposedException.cs
@@ -0,0 +1,9 @@
+using System;
+
+namespace DotPulsar.Internal.Exceptions
+{
+    public sealed class AsyncLockDisposedException : ObjectDisposedException
+    {
+        public AsyncLockDisposedException() : base(typeof(AsyncLock).FullName) { }
+    }
+}
diff --git a/src/DotPulsar/Internal/Exceptions/AsyncQueueDisposedException.cs b/src/DotPulsar/Internal/Exceptions/AsyncQueueDisposedException.cs
new file mode 100644
index 0000000..27156cd
--- /dev/null
+++ b/src/DotPulsar/Internal/Exceptions/AsyncQueueDisposedException.cs
@@ -0,0 +1,9 @@
+using System;
+
+namespace DotPulsar.Internal.Exceptions
+{
+    public sealed class AsyncQueueDisposedException : ObjectDisposedException
+    {
+        public AsyncQueueDisposedException() : base(typeof(AsyncQueue<>).FullName) { }
+    }
+}
diff --git a/src/DotPulsar/Internal/Exceptions/ConnectionDisposedException.cs b/src/DotPulsar/Internal/Exceptions/ConnectionDisposedException.cs
new file mode 100644
index 0000000..e445908
--- /dev/null
+++ b/src/DotPulsar/Internal/Exceptions/ConnectionDisposedException.cs
@@ -0,0 +1,9 @@
+using System;
+
+namespace DotPulsar.Internal.Exceptions
+{
+    public sealed class ConnectionDisposedException : ObjectDisposedException
+    {
+        public ConnectionDisposedException() : base(typeof(Connection).FullName) { }
+    }
+}
diff --git a/src/DotPulsar/Internal/Exceptions/PulsarStreamDisposedException.cs b/src/DotPulsar/Internal/Exceptions/PulsarStreamDisposedException.cs
new file mode 100644
index 0000000..8bd4567
--- /dev/null
+++ b/src/DotPulsar/Internal/Exceptions/PulsarStreamDisposedException.cs
@@ -0,0 +1,9 @@
+using System;
+
+namespace DotPulsar.Internal.Exceptions
+{
+    public sealed class PulsarStreamDisposedException : ObjectDisposedException
+    {
+        public PulsarStreamDisposedException() : base(typeof(PulsarStream).FullName) { }
+    }
+}
diff --git a/src/DotPulsar/Internal/Producer.cs b/src/DotPulsar/Internal/Producer.cs
index 2d850cf..9362a57 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -13,6 +13,7 @@
  */
 
 using DotPulsar.Abstractions;
+using DotPulsar.Exceptions;
 using DotPulsar.Internal.Abstractions;
 using DotPulsar.Internal.Events;
 using System;
@@ -102,7 +103,7 @@ namespace DotPulsar.Internal
         private void ThrowIfDisposed()
         {
             if (_isDisposed != 0)
-                throw new ObjectDisposedException(nameof(Producer));
+                throw new ProducerDisposedException();
         }
     }
 }
diff --git a/src/DotPulsar/Internal/PulsarStream.cs b/src/DotPulsar/Internal/PulsarStream.cs
index 4fafbf2..99eef2f 100644
--- a/src/DotPulsar/Internal/PulsarStream.cs
+++ b/src/DotPulsar/Internal/PulsarStream.cs
@@ -13,6 +13,7 @@
  */
 
 using DotPulsar.Internal.Abstractions;
+using DotPulsar.Internal.Exceptions;
 using DotPulsar.Internal.Extensions;
 using System;
 using System.Buffers;
@@ -154,7 +155,7 @@ namespace DotPulsar.Internal
         private void ThrowIfDisposed()
         {
             if (_isDisposed != 0)
-                throw new ObjectDisposedException(nameof(PulsarStream));
+                throw new PulsarStreamDisposedException();
         }
     }
 }
diff --git a/src/DotPulsar/Internal/Reader.cs b/src/DotPulsar/Internal/Reader.cs
index f444e3b..4804d67 100644
--- a/src/DotPulsar/Internal/Reader.cs
+++ b/src/DotPulsar/Internal/Reader.cs
@@ -13,6 +13,7 @@
  */
 
 using DotPulsar.Abstractions;
+using DotPulsar.Exceptions;
 using DotPulsar.Internal.Abstractions;
 using DotPulsar.Internal.Events;
 using System;
@@ -87,7 +88,7 @@ namespace DotPulsar.Internal
         private void ThrowIfDisposed()
         {
             if (_isDisposed != 0)
-                throw new ObjectDisposedException(nameof(Reader));
+                throw new ReaderDisposedException();
         }
     }
 }
diff --git a/src/DotPulsar/PulsarClient.cs b/src/DotPulsar/PulsarClient.cs
index f368ce0..21e4aaf 100644
--- a/src/DotPulsar/PulsarClient.cs
+++ b/src/DotPulsar/PulsarClient.cs
@@ -13,6 +13,7 @@
  */
 
 using DotPulsar.Abstractions;
+using DotPulsar.Exceptions;
 using DotPulsar.Internal;
 using DotPulsar.Internal.Abstractions;
 using System;
@@ -95,7 +96,7 @@ namespace DotPulsar
         private void ThrowIfDisposed()
         {
             if (_isDisposed != 0)
-                throw new ObjectDisposedException(nameof(PulsarClient));
+                throw new PulsarClientDisposedException();
         }
     }
 }