You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bl...@apache.org on 2020/06/26 07:13:28 UTC
[pulsar-dotpulsar] branch master updated: Dispose channels to
ensure the consumer and reader attach to the new queue.
This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 05bb554 Dispose channels to ensure the consumer and reader attach to the new queue.
05bb554 is described below
commit 05bb554b9c5cc654ce01c28f384ff2f1dd8a5c0e
Author: Daniel Blankensteiner <db...@danskecommodities.com>
AuthorDate: Fri Jun 26 09:13:18 2020 +0200
Dispose channels to ensure the consumer and reader attach to the new queue.
---
src/DotPulsar/Internal/Consumer.cs | 13 +++++++++++--
src/DotPulsar/Internal/ConsumerProcess.cs | 14 ++------------
src/DotPulsar/Internal/Producer.cs | 13 +++++++++++--
src/DotPulsar/Internal/ProducerProcess.cs | 14 ++------------
src/DotPulsar/Internal/Reader.cs | 13 +++++++++++--
src/DotPulsar/Internal/ReaderProcess.cs | 14 ++------------
6 files changed, 39 insertions(+), 42 deletions(-)
diff --git a/src/DotPulsar/Internal/Consumer.cs b/src/DotPulsar/Internal/Consumer.cs
index 2090cdf..80e6369 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -145,10 +145,19 @@ namespace DotPulsar.Internal
}, cancellationToken).ConfigureAwait(false);
}
- internal void SetChannel(IConsumerChannel channel)
+ internal async ValueTask SetChannel(IConsumerChannel channel)
{
- ThrowIfDisposed();
+ if (_isDisposed != 0)
+ {
+ await channel.DisposeAsync().ConfigureAwait(false);
+ return;
+ }
+
+ var oldChannel = _channel;
_channel = channel;
+
+ if (oldChannel != null)
+ await oldChannel.DisposeAsync().ConfigureAwait(false);
}
private void ThrowIfDisposed()
diff --git a/src/DotPulsar/Internal/ConsumerProcess.cs b/src/DotPulsar/Internal/ConsumerProcess.cs
index fabf4d8..449aedb 100644
--- a/src/DotPulsar/Internal/ConsumerProcess.cs
+++ b/src/DotPulsar/Internal/ConsumerProcess.cs
@@ -84,18 +84,8 @@ namespace DotPulsar.Internal
private async void SetupChannel()
{
- IConsumerChannel? channel = null;
-
- try
- {
- channel = await _factory.Create(CancellationTokenSource.Token).ConfigureAwait(false);
- _consumer.SetChannel(channel);
- }
- catch
- {
- if (channel != null)
- await channel.DisposeAsync().ConfigureAwait(false);
- }
+ var channel = await _factory.Create(CancellationTokenSource.Token).ConfigureAwait(false);
+ await _consumer.SetChannel(channel).ConfigureAwait(false);
}
}
}
diff --git a/src/DotPulsar/Internal/Producer.cs b/src/DotPulsar/Internal/Producer.cs
index a40c494..3303d01 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -108,10 +108,19 @@ namespace DotPulsar.Internal
return new MessageId(response.MessageId);
}
- internal void SetChannel(IProducerChannel channel)
+ internal async ValueTask SetChannel(IProducerChannel channel)
{
- ThrowIfDisposed();
+ if (_isDisposed != 0)
+ {
+ await channel.DisposeAsync().ConfigureAwait(false);
+ return;
+ }
+
+ var oldChannel = _channel;
_channel = channel;
+
+ if (oldChannel != null)
+ await oldChannel.DisposeAsync().ConfigureAwait(false);
}
private void ThrowIfDisposed()
diff --git a/src/DotPulsar/Internal/ProducerProcess.cs b/src/DotPulsar/Internal/ProducerProcess.cs
index c2f1cba..fad638a 100644
--- a/src/DotPulsar/Internal/ProducerProcess.cs
+++ b/src/DotPulsar/Internal/ProducerProcess.cs
@@ -68,18 +68,8 @@ namespace DotPulsar.Internal
private async void SetupChannel()
{
- IProducerChannel? channel = null;
-
- try
- {
- channel = await _factory.Create(CancellationTokenSource.Token).ConfigureAwait(false);
- _producer.SetChannel(channel);
- }
- catch
- {
- if (channel != null)
- await channel.DisposeAsync().ConfigureAwait(false);
- }
+ var channel = await _factory.Create(CancellationTokenSource.Token).ConfigureAwait(false);
+ await _producer.SetChannel(channel).ConfigureAwait(false);
}
}
}
diff --git a/src/DotPulsar/Internal/Reader.cs b/src/DotPulsar/Internal/Reader.cs
index 2cab4a9..03f4e4f 100644
--- a/src/DotPulsar/Internal/Reader.cs
+++ b/src/DotPulsar/Internal/Reader.cs
@@ -89,10 +89,19 @@ namespace DotPulsar.Internal
await _channel.DisposeAsync().ConfigureAwait(false);
}
- internal void SetChannel(IReaderChannel channel)
+ internal async ValueTask SetChannel(IReaderChannel channel)
{
- ThrowIfDisposed();
+ if (_isDisposed != 0)
+ {
+ await channel.DisposeAsync().ConfigureAwait(false);
+ return;
+ }
+
+ var oldChannel = _channel;
_channel = channel;
+
+ if (oldChannel != null)
+ await oldChannel.DisposeAsync().ConfigureAwait(false);
}
private void ThrowIfDisposed()
diff --git a/src/DotPulsar/Internal/ReaderProcess.cs b/src/DotPulsar/Internal/ReaderProcess.cs
index f5ef119..9b9a438 100644
--- a/src/DotPulsar/Internal/ReaderProcess.cs
+++ b/src/DotPulsar/Internal/ReaderProcess.cs
@@ -71,18 +71,8 @@ namespace DotPulsar.Internal
private async void SetupChannel()
{
- IReaderChannel? channel = null;
-
- try
- {
- channel = await _factory.Create(CancellationTokenSource.Token).ConfigureAwait(false);
- _reader.SetChannel(channel);
- }
- catch
- {
- if (channel != null)
- await channel.DisposeAsync().ConfigureAwait(false);
- }
+ var channel = await _factory.Create(CancellationTokenSource.Token).ConfigureAwait(false);
+ await _reader.SetChannel(channel).ConfigureAwait(false);
}
}
}