You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bl...@apache.org on 2020/06/26 07:13:28 UTC

[pulsar-dotpulsar] branch master updated: Dispose channels to ensure the consumer and reader attach to the new queue.

This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 05bb554  Dispose channels to ensure the consumer and reader attach to the new queue.
05bb554 is described below

commit 05bb554b9c5cc654ce01c28f384ff2f1dd8a5c0e
Author: Daniel Blankensteiner <db...@danskecommodities.com>
AuthorDate: Fri Jun 26 09:13:18 2020 +0200

    Dispose channels to ensure the consumer and reader attach to the new queue.
---
 src/DotPulsar/Internal/Consumer.cs        | 13 +++++++++++--
 src/DotPulsar/Internal/ConsumerProcess.cs | 14 ++------------
 src/DotPulsar/Internal/Producer.cs        | 13 +++++++++++--
 src/DotPulsar/Internal/ProducerProcess.cs | 14 ++------------
 src/DotPulsar/Internal/Reader.cs          | 13 +++++++++++--
 src/DotPulsar/Internal/ReaderProcess.cs   | 14 ++------------
 6 files changed, 39 insertions(+), 42 deletions(-)

diff --git a/src/DotPulsar/Internal/Consumer.cs b/src/DotPulsar/Internal/Consumer.cs
index 2090cdf..80e6369 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -145,10 +145,19 @@ namespace DotPulsar.Internal
             }, cancellationToken).ConfigureAwait(false);
         }
 
-        internal void SetChannel(IConsumerChannel channel)
+        internal async ValueTask SetChannel(IConsumerChannel channel)
         {
-            ThrowIfDisposed();
+            if (_isDisposed != 0)
+            {
+                await channel.DisposeAsync().ConfigureAwait(false);
+                return;
+            }
+
+            var oldChannel = _channel;
             _channel = channel;
+
+            if (oldChannel != null)
+                await oldChannel.DisposeAsync().ConfigureAwait(false);
         }
 
         private void ThrowIfDisposed()
diff --git a/src/DotPulsar/Internal/ConsumerProcess.cs b/src/DotPulsar/Internal/ConsumerProcess.cs
index fabf4d8..449aedb 100644
--- a/src/DotPulsar/Internal/ConsumerProcess.cs
+++ b/src/DotPulsar/Internal/ConsumerProcess.cs
@@ -84,18 +84,8 @@ namespace DotPulsar.Internal
 
         private async void SetupChannel()
         {
-            IConsumerChannel? channel = null;
-
-            try
-            {
-                channel = await _factory.Create(CancellationTokenSource.Token).ConfigureAwait(false);
-                _consumer.SetChannel(channel);
-            }
-            catch
-            {
-                if (channel != null)
-                    await channel.DisposeAsync().ConfigureAwait(false);
-            }
+            var channel = await _factory.Create(CancellationTokenSource.Token).ConfigureAwait(false);
+            await _consumer.SetChannel(channel).ConfigureAwait(false);
         }
     }
 }
diff --git a/src/DotPulsar/Internal/Producer.cs b/src/DotPulsar/Internal/Producer.cs
index a40c494..3303d01 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -108,10 +108,19 @@ namespace DotPulsar.Internal
             return new MessageId(response.MessageId);
         }
 
-        internal void SetChannel(IProducerChannel channel)
+        internal async ValueTask SetChannel(IProducerChannel channel)
         {
-            ThrowIfDisposed();
+            if (_isDisposed != 0)
+            {
+                await channel.DisposeAsync().ConfigureAwait(false);
+                return;
+            }
+
+            var oldChannel = _channel;
             _channel = channel;
+
+            if (oldChannel != null)
+                await oldChannel.DisposeAsync().ConfigureAwait(false);
         }
 
         private void ThrowIfDisposed()
diff --git a/src/DotPulsar/Internal/ProducerProcess.cs b/src/DotPulsar/Internal/ProducerProcess.cs
index c2f1cba..fad638a 100644
--- a/src/DotPulsar/Internal/ProducerProcess.cs
+++ b/src/DotPulsar/Internal/ProducerProcess.cs
@@ -68,18 +68,8 @@ namespace DotPulsar.Internal
 
         private async void SetupChannel()
         {
-            IProducerChannel? channel = null;
-
-            try
-            {
-                channel = await _factory.Create(CancellationTokenSource.Token).ConfigureAwait(false);
-                _producer.SetChannel(channel);
-            }
-            catch
-            {
-                if (channel != null)
-                    await channel.DisposeAsync().ConfigureAwait(false);
-            }
+            var channel = await _factory.Create(CancellationTokenSource.Token).ConfigureAwait(false);
+            await _producer.SetChannel(channel).ConfigureAwait(false);
         }
     }
 }
diff --git a/src/DotPulsar/Internal/Reader.cs b/src/DotPulsar/Internal/Reader.cs
index 2cab4a9..03f4e4f 100644
--- a/src/DotPulsar/Internal/Reader.cs
+++ b/src/DotPulsar/Internal/Reader.cs
@@ -89,10 +89,19 @@ namespace DotPulsar.Internal
             await _channel.DisposeAsync().ConfigureAwait(false);
         }
 
-        internal void SetChannel(IReaderChannel channel)
+        internal async ValueTask SetChannel(IReaderChannel channel)
         {
-            ThrowIfDisposed();
+            if (_isDisposed != 0)
+            {
+                await channel.DisposeAsync().ConfigureAwait(false);
+                return;
+            }
+
+            var oldChannel = _channel;
             _channel = channel;
+
+            if (oldChannel != null)
+                await oldChannel.DisposeAsync().ConfigureAwait(false);
         }
 
         private void ThrowIfDisposed()
diff --git a/src/DotPulsar/Internal/ReaderProcess.cs b/src/DotPulsar/Internal/ReaderProcess.cs
index f5ef119..9b9a438 100644
--- a/src/DotPulsar/Internal/ReaderProcess.cs
+++ b/src/DotPulsar/Internal/ReaderProcess.cs
@@ -71,18 +71,8 @@ namespace DotPulsar.Internal
 
         private async void SetupChannel()
         {
-            IReaderChannel? channel = null;
-
-            try
-            {
-                channel = await _factory.Create(CancellationTokenSource.Token).ConfigureAwait(false);
-                _reader.SetChannel(channel);
-            }
-            catch
-            {
-                if (channel != null)
-                    await channel.DisposeAsync().ConfigureAwait(false);
-            }
+            var channel = await _factory.Create(CancellationTokenSource.Token).ConfigureAwait(false);
+            await _reader.SetChannel(channel).ConfigureAwait(false);
         }
     }
 }