You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bl...@apache.org on 2020/03/25 07:01:43 UTC
[pulsar-dotpulsar] branch master updated: Remove Fody and manually
add .ConfigureAwait(false) to all async/awaits.
This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 53bba0d Remove Fody and manually add .ConfigureAwait(false) to all async/awaits.
53bba0d is described below
commit 53bba0db6175adf351bfe92e67060961ccaf87b8
Author: Jan-Pieter George <ja...@gmail.com>
AuthorDate: Wed Mar 25 07:29:47 2020 +0100
Remove Fody and manually add .ConfigureAwait(false) to all async/awaits.
---
samples/Consuming/Program.cs | 10 ++--
samples/Producing/Program.cs | 12 ++--
samples/Reading/Program.cs | 8 +--
src/DotPulsar.Stress.Tests/ConsumerTests.cs | 8 +--
.../XunitExceptionHandler.cs | 2 +-
src/DotPulsar.Tests/Internal/AsyncLockTests.cs | 36 ++++++------
src/DotPulsar.Tests/Internal/AsyncQueueTests.cs | 16 ++---
src/DotPulsar.Tests/Internal/StateManagerTests.cs | 2 +-
src/DotPulsar/DotPulsar.csproj | 7 +--
src/DotPulsar/FodyWeavers.xml | 4 --
src/DotPulsar/Internal/AsyncLock.cs | 2 +-
src/DotPulsar/Internal/AsyncLockExecutor.cs | 26 ++++-----
src/DotPulsar/Internal/Connection.cs | 68 +++++++++++-----------
src/DotPulsar/Internal/ConnectionPool.cs | 30 +++++-----
src/DotPulsar/Internal/Connector.cs | 10 ++--
src/DotPulsar/Internal/Consumer.cs | 24 ++++----
src/DotPulsar/Internal/ConsumerChannel.cs | 20 +++----
src/DotPulsar/Internal/ConsumerChannelFactory.cs | 6 +-
src/DotPulsar/Internal/ConsumerProcess.cs | 6 +-
src/DotPulsar/Internal/DefaultExceptionHandler.cs | 2 +-
src/DotPulsar/Internal/ExceptionHandlerPipeline.cs | 2 +-
src/DotPulsar/Internal/Executor.cs | 22 +++----
src/DotPulsar/Internal/MessageBuilder.cs | 2 +-
src/DotPulsar/Internal/ProcessManager.cs | 6 +-
src/DotPulsar/Internal/Producer.cs | 18 +++---
src/DotPulsar/Internal/ProducerChannel.cs | 8 +--
src/DotPulsar/Internal/ProducerChannelFactory.cs | 6 +-
src/DotPulsar/Internal/ProducerProcess.cs | 6 +-
src/DotPulsar/Internal/PulsarStream.cs | 14 ++---
src/DotPulsar/Internal/Reader.cs | 8 +--
src/DotPulsar/Internal/ReaderChannelFactory.cs | 6 +-
src/DotPulsar/Internal/ReaderProcess.cs | 6 +-
src/DotPulsar/PulsarClient.cs | 2 +-
33 files changed, 198 insertions(+), 207 deletions(-)
diff --git a/samples/Consuming/Program.cs b/samples/Consuming/Program.cs
index 1fa82a5..5b58a14 100644
--- a/samples/Consuming/Program.cs
+++ b/samples/Consuming/Program.cs
@@ -48,11 +48,11 @@ namespace Consuming
cts.Cancel();
- await consuming;
+ await consuming.ConfigureAwait(false);
- await consumer.DisposeAsync();
+ await consumer.DisposeAsync().ConfigureAwait(false);
- await monitoring;
+ await monitoring.ConfigureAwait(false);
}
private static async Task ConsumeMessages(IConsumer consumer, CancellationToken cancellationToken)
@@ -65,7 +65,7 @@ namespace Consuming
{
var data = Encoding.UTF8.GetString(message.Data.ToArray());
Console.WriteLine("Received: " + data);
- await consumer.Acknowledge(message, cancellationToken);
+ await consumer.Acknowledge(message, cancellationToken).ConfigureAwait(false);
}
}
catch(OperationCanceledException)
@@ -82,7 +82,7 @@ namespace Consuming
while (true)
{
- state = await consumer.StateChangedFrom(state);
+ state = await consumer.StateChangedFrom(state).ConfigureAwait(false);
var stateMessage = state switch
{
diff --git a/samples/Producing/Program.cs b/samples/Producing/Program.cs
index 2c7c082..b742d02 100644
--- a/samples/Producing/Program.cs
+++ b/samples/Producing/Program.cs
@@ -44,11 +44,11 @@ namespace Producing
cts.Cancel();
- await producing;
+ await producing.ConfigureAwait(false);
- await producer.DisposeAsync();
+ await producer.DisposeAsync().ConfigureAwait(false);
- await monitoring;
+ await monitoring.ConfigureAwait(false);
}
private static async Task ProduceMessages(IProducer producer, CancellationToken cancellationToken)
@@ -62,8 +62,8 @@ namespace Producing
while (!cancellationToken.IsCancellationRequested)
{
var data = Encoding.UTF8.GetBytes("Sent " + DateTime.UtcNow.ToString());
- _ = await producer.Send(data, cancellationToken);
- await Task.Delay(delay);
+ _ = await producer.Send(data, cancellationToken).ConfigureAwait(false);
+ await Task.Delay(delay).ConfigureAwait(false);
}
}
catch (OperationCanceledException) // If not using the cancellationToken, then just dispose the producer and catch ObjectDisposedException instead
@@ -80,7 +80,7 @@ namespace Producing
while (true)
{
- state = await producer.StateChangedFrom(state);
+ state = await producer.StateChangedFrom(state).ConfigureAwait(false);
var stateMessage = state switch
{
diff --git a/samples/Reading/Program.cs b/samples/Reading/Program.cs
index 3c12917..515cf60 100644
--- a/samples/Reading/Program.cs
+++ b/samples/Reading/Program.cs
@@ -49,11 +49,11 @@ namespace Reading
cts.Cancel();
- await reading;
+ await reading.ConfigureAwait(false);
- await reader.DisposeAsync();
+ await reader.DisposeAsync().ConfigureAwait(false);
- await monitoring;
+ await monitoring.ConfigureAwait(false);
}
private static async Task ReadMessages(IReader reader, CancellationToken cancellationToken)
@@ -82,7 +82,7 @@ namespace Reading
while (true)
{
- state = await reader.StateChangedFrom(state);
+ state = await reader.StateChangedFrom(state).ConfigureAwait(false);
var stateMessage = state switch
{
diff --git a/src/DotPulsar.Stress.Tests/ConsumerTests.cs b/src/DotPulsar.Stress.Tests/ConsumerTests.cs
index 2517413..260fe06 100644
--- a/src/DotPulsar.Stress.Tests/ConsumerTests.cs
+++ b/src/DotPulsar.Stress.Tests/ConsumerTests.cs
@@ -61,15 +61,15 @@ namespace DotPulsar.Stress.Tests
var consume = ConsumeMessages(cts.Token);
var produce = ProduceMessages(cts.Token);
- var consumed = await consume;
- var produced = await produce;
+ var consumed = await consume.ConfigureAwait(false);
+ var produced = await produce.ConfigureAwait(false);
//Assert
consumed.Should().BeEquivalentTo(produced);
Task<MessageId[]> ProduceMessages(CancellationToken ct)
=> Enumerable.Range(1, numberOfMessages)
- .Select(async n => await producer.Send(Encoding.UTF8.GetBytes($"Sent #{n} at {DateTimeOffset.UtcNow:s}"), ct))
+ .Select(async n => await producer.Send(Encoding.UTF8.GetBytes($"Sent #{n} at {DateTimeOffset.UtcNow:s}"), ct).ConfigureAwait(false))
.WhenAll();
async Task<List<MessageId>> ConsumeMessages(CancellationToken ct)
@@ -82,7 +82,7 @@ namespace DotPulsar.Stress.Tests
if (ids.Count != numberOfMessages) continue;
- await consumer.AcknowledgeCumulative(message, ct);
+ await consumer.AcknowledgeCumulative(message, ct).ConfigureAwait(false);
break;
}
diff --git a/src/DotPulsar.Stress.Tests/XunitExceptionHandler.cs b/src/DotPulsar.Stress.Tests/XunitExceptionHandler.cs
index d8fdde3..2a75d3c 100644
--- a/src/DotPulsar.Stress.Tests/XunitExceptionHandler.cs
+++ b/src/DotPulsar.Stress.Tests/XunitExceptionHandler.cs
@@ -37,7 +37,7 @@ namespace DotPulsar.Stress.Tests
public async ValueTask OnException(ExceptionContext exceptionContext)
{
- await _exceptionHandler.OnException(exceptionContext);
+ await _exceptionHandler.OnException(exceptionContext).ConfigureAwait(false);
if (!exceptionContext.ExceptionHandled)
_output.WriteLine(
diff --git a/src/DotPulsar.Tests/Internal/AsyncLockTests.cs b/src/DotPulsar.Tests/Internal/AsyncLockTests.cs
index fa151e8..adce63f 100644
--- a/src/DotPulsar.Tests/Internal/AsyncLockTests.cs
+++ b/src/DotPulsar.Tests/Internal/AsyncLockTests.cs
@@ -37,7 +37,7 @@ namespace DotPulsar.Tests.Internal
//Annihilate
actual.Result.Dispose();
- await sut.DisposeAsync();
+ await sut.DisposeAsync().ConfigureAwait(false);
}
[Fact]
@@ -45,7 +45,7 @@ namespace DotPulsar.Tests.Internal
{
//Arrange
var sut = new AsyncLock();
- var alreadyTaken = await sut.Lock(CancellationToken.None);
+ var alreadyTaken = await sut.Lock(CancellationToken.None).ConfigureAwait(false);
//Act
var actual = sut.Lock(CancellationToken.None);
@@ -56,7 +56,7 @@ namespace DotPulsar.Tests.Internal
//Annihilate
alreadyTaken.Dispose();
actual.Result.Dispose();
- await sut.DisposeAsync();
+ await sut.DisposeAsync().ConfigureAwait(false);
}
[Fact]
@@ -64,10 +64,10 @@ namespace DotPulsar.Tests.Internal
{
//Arrange
var sut = new AsyncLock();
- await sut.DisposeAsync();
+ await sut.DisposeAsync().ConfigureAwait(false);
//Act
- var exception = await Record.ExceptionAsync(() => sut.Lock(CancellationToken.None));
+ var exception = await Record.ExceptionAsync(() => sut.Lock(CancellationToken.None)).ConfigureAwait(false);
//Assert
Assert.IsType<AsyncLockDisposedException>(exception);
@@ -78,18 +78,18 @@ namespace DotPulsar.Tests.Internal
{
//Arrange
var sut = new AsyncLock();
- var gotLock = await sut.Lock(CancellationToken.None);
+ var gotLock = await sut.Lock(CancellationToken.None).ConfigureAwait(false);
var awaiting = sut.Lock(CancellationToken.None);
- _ = Task.Run(async () => await sut.DisposeAsync());
+ _ = Task.Run(async () => await sut.DisposeAsync().ConfigureAwait(false));
//Act
- var exception = await Record.ExceptionAsync(() => awaiting);
+ var exception = await Record.ExceptionAsync(() => awaiting).ConfigureAwait(false);
//Assert
Assert.IsType<TaskCanceledException>(exception);
//Annihilate
- await sut.DisposeAsync();
+ await sut.DisposeAsync().ConfigureAwait(false);
gotLock.Dispose();
}
@@ -99,12 +99,12 @@ namespace DotPulsar.Tests.Internal
//Arrange
var cts = new CancellationTokenSource();
var sut = new AsyncLock();
- var gotLock = await sut.Lock(CancellationToken.None);
+ var gotLock = await sut.Lock(CancellationToken.None).ConfigureAwait(false);
var awaiting = sut.Lock(cts.Token);
//Act
cts.Cancel();
- var exception = await Record.ExceptionAsync(() => awaiting);
+ var exception = await Record.ExceptionAsync(() => awaiting).ConfigureAwait(false);
//Assert
Assert.IsType<TaskCanceledException>(exception);
@@ -112,7 +112,7 @@ namespace DotPulsar.Tests.Internal
//Annihilate
cts.Dispose();
gotLock.Dispose();
- await sut.DisposeAsync();
+ await sut.DisposeAsync().ConfigureAwait(false);
}
[Fact]
@@ -120,19 +120,19 @@ namespace DotPulsar.Tests.Internal
{
//Arrange
var sut = new AsyncLock();
- var gotLock = await sut.Lock(CancellationToken.None);
- var disposeTask = Task.Run(async () => await sut.DisposeAsync());
+ var gotLock = await sut.Lock(CancellationToken.None).ConfigureAwait(false);
+ var disposeTask = Task.Run(async () => await sut.DisposeAsync().ConfigureAwait(false));
Assert.False(disposeTask.IsCompleted);
//Act
gotLock.Dispose();
- await disposeTask;
+ await disposeTask.ConfigureAwait(false);
//Assert
Assert.True(disposeTask.IsCompleted);
//Annihilate
- await sut.DisposeAsync();
+ await sut.DisposeAsync().ConfigureAwait(false);
}
[Fact]
@@ -142,8 +142,8 @@ namespace DotPulsar.Tests.Internal
var sut = new AsyncLock();
//Act
- await sut.DisposeAsync();
- var exception = await Record.ExceptionAsync(() => sut.DisposeAsync().AsTask()); // xUnit can't record ValueTask yet
+ await sut.DisposeAsync().ConfigureAwait(false);
+ var exception = await Record.ExceptionAsync(() => sut.DisposeAsync().AsTask()).ConfigureAwait(false); // xUnit can't record ValueTask yet
//Assert
Assert.Null(exception);
diff --git a/src/DotPulsar.Tests/Internal/AsyncQueueTests.cs b/src/DotPulsar.Tests/Internal/AsyncQueueTests.cs
index a0bce74..290932c 100644
--- a/src/DotPulsar.Tests/Internal/AsyncQueueTests.cs
+++ b/src/DotPulsar.Tests/Internal/AsyncQueueTests.cs
@@ -31,7 +31,7 @@ namespace DotPulsar.Tests.Internal
queue.Enqueue(value);
//Act
- var actual = await dequeueTask;
+ var actual = await dequeueTask.ConfigureAwait(false);
//Assert
Assert.Equal(value, actual);
@@ -49,7 +49,7 @@ namespace DotPulsar.Tests.Internal
queue.Enqueue(value);
//Act
- var actual = await queue.Dequeue();
+ var actual = await queue.Dequeue().ConfigureAwait(false);
//Assert
Assert.Equal(value, actual);
@@ -70,8 +70,8 @@ namespace DotPulsar.Tests.Internal
queue.Enqueue(value2);
//Act
- var actual1 = await dequeue1;
- var actual2 = await dequeue2;
+ var actual1 = await dequeue1.ConfigureAwait(false);
+ var actual2 = await dequeue2.ConfigureAwait(false);
//Assert
Assert.Equal(value1, actual1);
@@ -91,8 +91,8 @@ namespace DotPulsar.Tests.Internal
queue.Enqueue(value2);
//Act
- var actual1 = await queue.Dequeue();
- var actual2 = await queue.Dequeue();
+ var actual1 = await queue.Dequeue().ConfigureAwait(false);
+ var actual2 = await queue.Dequeue().ConfigureAwait(false);
//Assert
Assert.Equal(value1, actual1);
@@ -115,8 +115,8 @@ namespace DotPulsar.Tests.Internal
//Act
source1.Cancel();
queue.Enqueue(excepted);
- var exception = await Record.ExceptionAsync(() => task1.AsTask()); // xUnit can't record ValueTask yet
- await task2;
+ var exception = await Record.ExceptionAsync(() => task1.AsTask()).ConfigureAwait(false); // xUnit can't record ValueTask yet
+ await task2.ConfigureAwait(false);
//Assert
Assert.IsType<TaskCanceledException>(exception);
diff --git a/src/DotPulsar.Tests/Internal/StateManagerTests.cs b/src/DotPulsar.Tests/Internal/StateManagerTests.cs
index 7323446..152b55b 100644
--- a/src/DotPulsar.Tests/Internal/StateManagerTests.cs
+++ b/src/DotPulsar.Tests/Internal/StateManagerTests.cs
@@ -233,7 +233,7 @@ namespace DotPulsar.Tests.Internal
//Act
cts.Cancel();
- var exception = await Record.ExceptionAsync(() => task.AsTask()); // xUnit can't record ValueTask yet
+ var exception = await Record.ExceptionAsync(() => task.AsTask()).ConfigureAwait(false); // xUnit can't record ValueTask yet
//Assert
Assert.IsType<TaskCanceledException>(exception);
diff --git a/src/DotPulsar/DotPulsar.csproj b/src/DotPulsar/DotPulsar.csproj
index 6fb1411..bb74643 100644
--- a/src/DotPulsar/DotPulsar.csproj
+++ b/src/DotPulsar/DotPulsar.csproj
@@ -20,12 +20,7 @@
<NoWarn>1591;1701;1702</NoWarn>
</PropertyGroup>
- <ItemGroup>
- <PackageReference Include="ConfigureAwait.Fody" Version="3.3.1" PrivateAssets="All" />
- <PackageReference Include="Fody" Version="6.1.1">
- <PrivateAssets>all</PrivateAssets>
- <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
- </PackageReference>
+ <ItemGroup>
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0" PrivateAssets="All" />
<PackageReference Include="protobuf-net" Version="2.4.4" />
<PackageReference Include="System.IO.Pipelines" Version="4.7.0" />
diff --git a/src/DotPulsar/FodyWeavers.xml b/src/DotPulsar/FodyWeavers.xml
deleted file mode 100644
index b74c926..0000000
--- a/src/DotPulsar/FodyWeavers.xml
+++ /dev/null
@@ -1,4 +0,0 @@
-<?xml version="1.0" encoding="utf-8"?>
-<Weavers xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="FodyWeavers.xsd">
- <ConfigureAwait ContinueOnCapturedContext="false" />
-</Weavers>
\ No newline at end of file
diff --git a/src/DotPulsar/Internal/AsyncLock.cs b/src/DotPulsar/Internal/AsyncLock.cs
index 592030b..bd2d8c4 100644
--- a/src/DotPulsar/Internal/AsyncLock.cs
+++ b/src/DotPulsar/Internal/AsyncLock.cs
@@ -75,7 +75,7 @@ namespace DotPulsar.Internal
_pending.Clear();
}
- await _semaphoreSlim.WaitAsync(); //Wait for possible lock-holder to finish
+ await _semaphoreSlim.WaitAsync().ConfigureAwait(false); //Wait for possible lock-holder to finish
_semaphoreSlim.Release();
_semaphoreSlim.Dispose();
}
diff --git a/src/DotPulsar/Internal/AsyncLockExecutor.cs b/src/DotPulsar/Internal/AsyncLockExecutor.cs
index df9b452..98d2663 100644
--- a/src/DotPulsar/Internal/AsyncLockExecutor.cs
+++ b/src/DotPulsar/Internal/AsyncLockExecutor.cs
@@ -30,53 +30,53 @@ namespace DotPulsar.Internal
_executor = executor;
}
- public async ValueTask DisposeAsync() => await _lock.DisposeAsync();
+ public async ValueTask DisposeAsync() => await _lock.DisposeAsync().ConfigureAwait(false);
public async ValueTask Execute(Action action, CancellationToken cancellationToken)
{
- using (await _lock.Lock(cancellationToken))
+ using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
{
- await _executor.Execute(action, cancellationToken);
+ await _executor.Execute(action, cancellationToken).ConfigureAwait(false);
}
}
public async ValueTask Execute(Func<Task> func, CancellationToken cancellationToken)
{
- using (await _lock.Lock(cancellationToken))
+ using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
{
- await _executor.Execute(func, cancellationToken);
+ await _executor.Execute(func, cancellationToken).ConfigureAwait(false);
}
}
public async ValueTask Execute(Func<ValueTask> func, CancellationToken cancellationToken)
{
- using (await _lock.Lock(cancellationToken))
+ using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
{
- await _executor.Execute(func, cancellationToken);
+ await _executor.Execute(func, cancellationToken).ConfigureAwait(false);
}
}
public async ValueTask<TResult> Execute<TResult>(Func<TResult> func, CancellationToken cancellationToken)
{
- using (await _lock.Lock(cancellationToken))
+ using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
{
- return await _executor.Execute(func, cancellationToken);
+ return await _executor.Execute(func, cancellationToken).ConfigureAwait(false);
}
}
public async ValueTask<TResult> Execute<TResult>(Func<Task<TResult>> func, CancellationToken cancellationToken)
{
- using (await _lock.Lock(cancellationToken))
+ using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
{
- return await _executor.Execute(func, cancellationToken);
+ return await _executor.Execute(func, cancellationToken).ConfigureAwait(false);
}
}
public async ValueTask<TResult> Execute<TResult>(Func<ValueTask<TResult>> func, CancellationToken cancellationToken)
{
- using (await _lock.Lock(cancellationToken))
+ using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
{
- return await _executor.Execute(func, cancellationToken);
+ return await _executor.Execute(func, cancellationToken).ConfigureAwait(false);
}
}
}
diff --git a/src/DotPulsar/Internal/Connection.cs b/src/DotPulsar/Internal/Connection.cs
index 4e168df..b5a2372 100644
--- a/src/DotPulsar/Internal/Connection.cs
+++ b/src/DotPulsar/Internal/Connection.cs
@@ -43,7 +43,7 @@ namespace DotPulsar.Internal
{
ThrowIfDisposed();
- using (await _lock.Lock(cancellationToken))
+ using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
{
return _channelManager.HasChannels();
}
@@ -55,16 +55,16 @@ namespace DotPulsar.Internal
Task<ProducerResponse>? responseTask = null;
- using (await _lock.Lock(cancellationToken))
+ using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
{
var baseCommand = command.AsBaseCommand();
var requestResponseTask = _requestResponseHandler.Outgoing(baseCommand);
responseTask = _channelManager.Outgoing(command, requestResponseTask, channel);
var sequence = Serializer.Serialize(baseCommand);
- await _stream.Send(sequence);
+ await _stream.Send(sequence).ConfigureAwait(false);
}
- return await responseTask;
+ return await responseTask.ConfigureAwait(false);
}
public async Task<SubscribeResponse> Send(CommandSubscribe command, IChannel channel, CancellationToken cancellationToken)
@@ -73,29 +73,29 @@ namespace DotPulsar.Internal
Task<SubscribeResponse>? responseTask = null;
- using (await _lock.Lock(cancellationToken))
+ using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
{
var baseCommand = command.AsBaseCommand();
var requestResponseTask = _requestResponseHandler.Outgoing(baseCommand);
responseTask = _channelManager.Outgoing(command, requestResponseTask, channel);
var sequence = Serializer.Serialize(baseCommand);
- await _stream.Send(sequence);
+ await _stream.Send(sequence).ConfigureAwait(false);
}
- return await responseTask;
+ return await responseTask.ConfigureAwait(false);
}
public async Task Send(CommandPing command, CancellationToken cancellationToken) =>
- await Send(command.AsBaseCommand(), cancellationToken);
+ await Send(command.AsBaseCommand(), cancellationToken).ConfigureAwait(false);
public async Task Send(CommandPong command, CancellationToken cancellationToken) =>
- await Send(command.AsBaseCommand(), cancellationToken);
+ await Send(command.AsBaseCommand(), cancellationToken).ConfigureAwait(false);
public async Task Send(CommandAck command, CancellationToken cancellationToken) =>
- await Send(command.AsBaseCommand(), cancellationToken);
+ await Send(command.AsBaseCommand(), cancellationToken).ConfigureAwait(false);
public async Task Send(CommandFlow command, CancellationToken cancellationToken) =>
- await Send(command.AsBaseCommand(), cancellationToken);
+ await Send(command.AsBaseCommand(), cancellationToken).ConfigureAwait(false);
public async Task<BaseCommand> Send(CommandUnsubscribe command, CancellationToken cancellationToken)
{
@@ -103,29 +103,29 @@ namespace DotPulsar.Internal
Task<BaseCommand>? responseTask = null;
- using (await _lock.Lock(cancellationToken))
+ using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
{
var baseCommand = command.AsBaseCommand();
responseTask = _requestResponseHandler.Outgoing(baseCommand);
_channelManager.Outgoing(command, responseTask);
var sequence = Serializer.Serialize(baseCommand);
- await _stream.Send(sequence);
+ await _stream.Send(sequence).ConfigureAwait(false);
}
- return await responseTask;
+ return await responseTask.ConfigureAwait(false);
}
public async Task<BaseCommand> Send(CommandConnect command, CancellationToken cancellationToken) =>
- await SendRequestResponse(command.AsBaseCommand(), cancellationToken);
+ await SendRequestResponse(command.AsBaseCommand(), cancellationToken).ConfigureAwait(false);
public async Task<BaseCommand> Send(CommandLookupTopic command, CancellationToken cancellationToken) =>
- await SendRequestResponse(command.AsBaseCommand(), cancellationToken);
+ await SendRequestResponse(command.AsBaseCommand(), cancellationToken).ConfigureAwait(false);
public async Task<BaseCommand> Send(CommandSeek command, CancellationToken cancellationToken) =>
- await SendRequestResponse(command.AsBaseCommand(), cancellationToken);
+ await SendRequestResponse(command.AsBaseCommand(), cancellationToken).ConfigureAwait(false);
public async Task<BaseCommand> Send(CommandGetLastMessageId command, CancellationToken cancellationToken) =>
- await SendRequestResponse(command.AsBaseCommand(), cancellationToken);
+ await SendRequestResponse(command.AsBaseCommand(), cancellationToken).ConfigureAwait(false);
public async Task<BaseCommand> Send(CommandCloseProducer command, CancellationToken cancellationToken)
{
@@ -133,16 +133,16 @@ namespace DotPulsar.Internal
Task<BaseCommand>? responseTask = null;
- using (await _lock.Lock(cancellationToken))
+ using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
{
var baseCommand = command.AsBaseCommand();
responseTask = _requestResponseHandler.Outgoing(baseCommand);
_channelManager.Outgoing(command, responseTask);
var sequence = Serializer.Serialize(baseCommand);
- await _stream.Send(sequence);
+ await _stream.Send(sequence).ConfigureAwait(false);
}
- return await responseTask;
+ return await responseTask.ConfigureAwait(false);
}
public async Task<BaseCommand> Send(CommandCloseConsumer command, CancellationToken cancellationToken)
@@ -151,16 +151,16 @@ namespace DotPulsar.Internal
Task<BaseCommand>? responseTask = null;
- using (await _lock.Lock(cancellationToken))
+ using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
{
var baseCommand = command.AsBaseCommand();
responseTask = _requestResponseHandler.Outgoing(baseCommand);
_channelManager.Outgoing(command, responseTask);
var sequence = Serializer.Serialize(baseCommand);
- await _stream.Send(sequence);
+ await _stream.Send(sequence).ConfigureAwait(false);
}
- return await responseTask;
+ return await responseTask.ConfigureAwait(false);
}
public async Task<BaseCommand> Send(SendPackage command, CancellationToken cancellationToken)
@@ -169,15 +169,15 @@ namespace DotPulsar.Internal
Task<BaseCommand>? response = null;
- using (await _lock.Lock(cancellationToken))
+ using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
{
var baseCommand = command.Command.AsBaseCommand();
response = _requestResponseHandler.Outgoing(baseCommand);
var sequence = Serializer.Serialize(baseCommand, command.Metadata, command.Payload);
- await _stream.Send(sequence);
+ await _stream.Send(sequence).ConfigureAwait(false);
}
- return await response;
+ return await response.ConfigureAwait(false);
}
private async Task<BaseCommand> SendRequestResponse(BaseCommand command, CancellationToken cancellationToken)
@@ -186,24 +186,24 @@ namespace DotPulsar.Internal
Task<BaseCommand>? response = null;
- using (await _lock.Lock(cancellationToken))
+ using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
{
response = _requestResponseHandler.Outgoing(command);
var sequence = Serializer.Serialize(command);
- await _stream.Send(sequence);
+ await _stream.Send(sequence).ConfigureAwait(false);
}
- return await response;
+ return await response.ConfigureAwait(false);
}
private async Task Send(BaseCommand command, CancellationToken cancellationToken)
{
ThrowIfDisposed();
- using (await _lock.Lock(cancellationToken))
+ using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
{
var sequence = Serializer.Serialize(command);
- await _stream.Send(sequence);
+ await _stream.Send(sequence).ConfigureAwait(false);
}
}
@@ -252,10 +252,10 @@ namespace DotPulsar.Internal
if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
return;
- await _lock.DisposeAsync();
+ await _lock.DisposeAsync().ConfigureAwait(false);
_requestResponseHandler.Dispose();
_channelManager.Dispose();
- await _stream.DisposeAsync();
+ await _stream.DisposeAsync().ConfigureAwait(false);
}
private void ThrowIfDisposed()
diff --git a/src/DotPulsar/Internal/ConnectionPool.cs b/src/DotPulsar/Internal/ConnectionPool.cs
index de178c7..c4159de 100644
--- a/src/DotPulsar/Internal/ConnectionPool.cs
+++ b/src/DotPulsar/Internal/ConnectionPool.cs
@@ -51,13 +51,13 @@ namespace DotPulsar.Internal
public async ValueTask DisposeAsync()
{
_cancellationTokenSource.Cancel();
- await _closeInactiveConnections;
+ await _closeInactiveConnections.ConfigureAwait(false);
- await _lock.DisposeAsync();
+ await _lock.DisposeAsync().ConfigureAwait(false);
foreach (var serviceUrl in _connections.Keys.ToArray())
{
- await DisposeConnection(serviceUrl);
+ await DisposeConnection(serviceUrl).ConfigureAwait(false);
}
}
@@ -74,8 +74,8 @@ namespace DotPulsar.Internal
while (true)
{
- var connection = await GetConnection(logicalUrl, physicalUrl, cancellationToken);
- var response = await connection.Send(lookup, cancellationToken);
+ var connection = await GetConnection(logicalUrl, physicalUrl, cancellationToken).ConfigureAwait(false);
+ var response = await connection.Send(lookup, cancellationToken).ConfigureAwait(false);
response.Expect(BaseCommand.Type.LookupResponse);
@@ -89,7 +89,7 @@ namespace DotPulsar.Internal
if (response.LookupTopicResponse.Response == CommandLookupTopicResponse.LookupType.Redirect || !response.LookupTopicResponse.Authoritative)
continue;
- return await GetConnection(logicalUrl, physicalUrl, cancellationToken);
+ return await GetConnection(logicalUrl, physicalUrl, cancellationToken).ConfigureAwait(false);
}
}
@@ -122,18 +122,18 @@ namespace DotPulsar.Internal
// the topic lookup.
private async ValueTask<Connection> GetConnection(Uri logicalUrl, Uri physicalUrl, CancellationToken cancellationToken)
{
- using (await _lock.Lock(cancellationToken))
+ using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
{
if (_connections.TryGetValue(logicalUrl, out Connection connection))
return connection;
- return await EstablishNewConnection(logicalUrl, physicalUrl, cancellationToken);
+ return await EstablishNewConnection(logicalUrl, physicalUrl, cancellationToken).ConfigureAwait(false);
}
}
private async Task<Connection> EstablishNewConnection(Uri logicalUrl, Uri physicalUrl, CancellationToken cancellationToken)
{
- var stream = await _connector.Connect(physicalUrl);
+ var stream = await _connector.Connect(physicalUrl).ConfigureAwait(false);
var connection = new Connection(new PulsarStream(stream));
DotPulsarEventSource.Log.ConnectionCreated();
_connections[logicalUrl] = connection;
@@ -145,7 +145,7 @@ namespace DotPulsar.Internal
_commandConnect.ProxyToBrokerUrl = $"{logicalUrl.Host}:{logicalUrl.Port}";
}
- var response = await connection.Send(_commandConnect, cancellationToken);
+ var response = await connection.Send(_commandConnect, cancellationToken).ConfigureAwait(false);
response.Expect(BaseCommand.Type.Connected);
_commandConnect.ResetProxyToBrokerUrl(); // reset so we can re-use this object
@@ -157,7 +157,7 @@ namespace DotPulsar.Internal
{
if (_connections.TryRemove(logicalUrl, out var connection))
{
- await connection.DisposeAsync();
+ await connection.DisposeAsync().ConfigureAwait(false);
DotPulsarEventSource.Log.ConnectionDisposed();
}
}
@@ -168,9 +168,9 @@ namespace DotPulsar.Internal
{
try
{
- await Task.Delay(interval, cancellationToken);
+ await Task.Delay(interval, cancellationToken).ConfigureAwait(false);
- using (await _lock.Lock(cancellationToken))
+ using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
{
var serviceUrls = _connections.Keys;
foreach (var serviceUrl in serviceUrls)
@@ -178,8 +178,8 @@ namespace DotPulsar.Internal
var connection = _connections[serviceUrl];
if (connection is null)
continue;
- if (!await connection.HasChannels(cancellationToken))
- await DisposeConnection(serviceUrl);
+ if (!await connection.HasChannels(cancellationToken).ConfigureAwait(false))
+ await DisposeConnection(serviceUrl).ConfigureAwait(false);
}
}
}
diff --git a/src/DotPulsar/Internal/Connector.cs b/src/DotPulsar/Internal/Connector.cs
index aa8365e..106d2cc 100644
--- a/src/DotPulsar/Internal/Connector.cs
+++ b/src/DotPulsar/Internal/Connector.cs
@@ -48,10 +48,10 @@ namespace DotPulsar.Internal
if (port == -1)
port = encrypt ? Constants.DefaultPulsarSSLPort : Constants.DefaultPulsarPort;
- var stream = await GetStream(host, port);
+ var stream = await GetStream(host, port).ConfigureAwait(false);
if (encrypt)
- stream = await EncryptStream(stream, host);
+ stream = await EncryptStream(stream, host).ConfigureAwait(false);
return stream;
}
@@ -65,9 +65,9 @@ namespace DotPulsar.Internal
var type = Uri.CheckHostName(host);
if (type == UriHostNameType.IPv4 || type == UriHostNameType.IPv6)
- await tcpClient.ConnectAsync(IPAddress.Parse(host), port);
+ await tcpClient.ConnectAsync(IPAddress.Parse(host), port).ConfigureAwait(false);
else
- await tcpClient.ConnectAsync(host, port);
+ await tcpClient.ConnectAsync(host, port).ConfigureAwait(false);
return tcpClient.GetStream();
}
@@ -85,7 +85,7 @@ namespace DotPulsar.Internal
try
{
sslStream = new SslStream(stream, false, new RemoteCertificateValidationCallback(ValidateServerCertificate), null);
- await sslStream.AuthenticateAsClientAsync(host, _clientCertificates, SslProtocols.None, true);
+ await sslStream.AuthenticateAsClientAsync(host, _clientCertificates, SslProtocols.None, true).ConfigureAwait(false);
return sslStream;
}
catch
diff --git a/src/DotPulsar/Internal/Consumer.cs b/src/DotPulsar/Internal/Consumer.cs
index 2f576b7..d239d83 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -54,10 +54,10 @@ namespace DotPulsar.Internal
}
public async ValueTask<ConsumerState> StateChangedTo(ConsumerState state, CancellationToken cancellationToken)
- => await _state.StateChangedTo(state, cancellationToken);
+ => await _state.StateChangedTo(state, cancellationToken).ConfigureAwait(false);
public async ValueTask<ConsumerState> StateChangedFrom(ConsumerState state, CancellationToken cancellationToken)
- => await _state.StateChangedFrom(state, cancellationToken);
+ => await _state.StateChangedFrom(state, cancellationToken).ConfigureAwait(false);
public bool IsFinalState() => _state.IsFinalState();
@@ -69,7 +69,7 @@ namespace DotPulsar.Internal
return;
_eventRegister.Register(new ConsumerDisposed(_correlationId, this));
- await _channel.DisposeAsync();
+ await _channel.DisposeAsync().ConfigureAwait(false);
}
public async IAsyncEnumerable<Message> Messages([EnumeratorCancellation] CancellationToken cancellationToken)
@@ -78,40 +78,40 @@ namespace DotPulsar.Internal
while (!cancellationToken.IsCancellationRequested)
{
- yield return await _executor.Execute(() => _channel.Receive(cancellationToken), cancellationToken);
+ yield return await _executor.Execute(() => _channel.Receive(cancellationToken), cancellationToken).ConfigureAwait(false);
}
}
public async ValueTask Acknowledge(Message message, CancellationToken cancellationToken)
- => await Acknowledge(message.MessageId.Data, CommandAck.AckType.Individual, cancellationToken);
+ => await Acknowledge(message.MessageId.Data, CommandAck.AckType.Individual, cancellationToken).ConfigureAwait(false);
public async ValueTask Acknowledge(MessageId messageId, CancellationToken cancellationToken)
- => await Acknowledge(messageId.Data, CommandAck.AckType.Individual, cancellationToken);
+ => await Acknowledge(messageId.Data, CommandAck.AckType.Individual, cancellationToken).ConfigureAwait(false);
public async ValueTask AcknowledgeCumulative(Message message, CancellationToken cancellationToken)
- => await Acknowledge(message.MessageId.Data, CommandAck.AckType.Cumulative, cancellationToken);
+ => await Acknowledge(message.MessageId.Data, CommandAck.AckType.Cumulative, cancellationToken).ConfigureAwait(false);
public async ValueTask AcknowledgeCumulative(MessageId messageId, CancellationToken cancellationToken)
- => await Acknowledge(messageId.Data, CommandAck.AckType.Cumulative, cancellationToken);
+ => await Acknowledge(messageId.Data, CommandAck.AckType.Cumulative, cancellationToken).ConfigureAwait(false);
public async ValueTask Unsubscribe(CancellationToken cancellationToken)
{
ThrowIfDisposed();
- _ = await _executor.Execute(() => _channel.Send(new CommandUnsubscribe(), cancellationToken), cancellationToken);
+ _ = await _executor.Execute(() => _channel.Send(new CommandUnsubscribe(), cancellationToken), cancellationToken).ConfigureAwait(false);
}
public async ValueTask Seek(MessageId messageId, CancellationToken cancellationToken)
{
ThrowIfDisposed();
var seek = new CommandSeek { MessageId = messageId.Data };
- _ = await _executor.Execute(() => _channel.Send(seek, cancellationToken), cancellationToken);
+ _ = await _executor.Execute(() => _channel.Send(seek, cancellationToken), cancellationToken).ConfigureAwait(false);
return;
}
public async ValueTask<MessageId> GetLastMessageId(CancellationToken cancellationToken)
{
ThrowIfDisposed();
- var response = await _executor.Execute(() => _channel.Send(new CommandGetLastMessageId(), cancellationToken), cancellationToken);
+ var response = await _executor.Execute(() => _channel.Send(new CommandGetLastMessageId(), cancellationToken), cancellationToken).ConfigureAwait(false);
return new MessageId(response.LastMessageId);
}
@@ -124,7 +124,7 @@ namespace DotPulsar.Internal
_cachedCommandAck.MessageIds.Clear();
_cachedCommandAck.MessageIds.Add(messageIdData);
return _channel.Send(_cachedCommandAck, cancellationToken);
- }, cancellationToken);
+ }, cancellationToken).ConfigureAwait(false);
}
internal void SetChannel(IConsumerChannel channel)
diff --git a/src/DotPulsar/Internal/ConsumerChannel.cs b/src/DotPulsar/Internal/ConsumerChannel.cs
index 735b078..9a05dce 100644
--- a/src/DotPulsar/Internal/ConsumerChannel.cs
+++ b/src/DotPulsar/Internal/ConsumerChannel.cs
@@ -52,7 +52,7 @@ namespace DotPulsar.Internal
while (true)
{
if (_sendWhenZero == 0)
- await SendFlow(cancellationToken);
+ await SendFlow(cancellationToken).ConfigureAwait(false);
_sendWhenZero--;
@@ -60,11 +60,11 @@ namespace DotPulsar.Internal
if (message != null)
return message;
- var messagePackage = await _queue.Dequeue(cancellationToken);
+ var messagePackage = await _queue.Dequeue(cancellationToken).ConfigureAwait(false);
if (!messagePackage.IsValid())
{
- await RejectPackage(messagePackage, cancellationToken);
+ await RejectPackage(messagePackage, cancellationToken).ConfigureAwait(false);
continue;
}
@@ -92,13 +92,13 @@ namespace DotPulsar.Internal
}
command.ConsumerId = _id;
- await _connection.Send(command, cancellationToken);
+ await _connection.Send(command, cancellationToken).ConfigureAwait(false);
}
public async Task<CommandSuccess> Send(CommandUnsubscribe command, CancellationToken cancellationToken)
{
command.ConsumerId = _id;
- var response = await _connection.Send(command, cancellationToken);
+ var response = await _connection.Send(command, cancellationToken).ConfigureAwait(false);
response.Expect(BaseCommand.Type.Success);
return response.Success;
}
@@ -106,7 +106,7 @@ namespace DotPulsar.Internal
public async Task<CommandSuccess> Send(CommandSeek command, CancellationToken cancellationToken)
{
command.ConsumerId = _id;
- var response = await _connection.Send(command, cancellationToken);
+ var response = await _connection.Send(command, cancellationToken).ConfigureAwait(false);
response.Expect(BaseCommand.Type.Success);
_batchHandler.Clear();
return response.Success;
@@ -115,7 +115,7 @@ namespace DotPulsar.Internal
public async Task<CommandGetLastMessageIdResponse> Send(CommandGetLastMessageId command, CancellationToken cancellationToken)
{
command.ConsumerId = _id;
- var response = await _connection.Send(command, cancellationToken);
+ var response = await _connection.Send(command, cancellationToken).ConfigureAwait(false);
response.Expect(BaseCommand.Type.GetLastMessageIdResponse);
return response.GetLastMessageIdResponse;
}
@@ -125,7 +125,7 @@ namespace DotPulsar.Internal
try
{
_queue.Dispose();
- await _connection.Send(new CommandCloseConsumer { ConsumerId = _id }, CancellationToken.None);
+ await _connection.Send(new CommandCloseConsumer { ConsumerId = _id }, CancellationToken.None).ConfigureAwait(false);
}
catch
{
@@ -136,7 +136,7 @@ namespace DotPulsar.Internal
private async ValueTask SendFlow(CancellationToken cancellationToken)
{
//TODO Should sending the flow command be handled on another thread and thereby not slow down the consumer?
- await _connection.Send(_cachedCommandFlow, cancellationToken);
+ await _connection.Send(_cachedCommandFlow, cancellationToken).ConfigureAwait(false);
if (_firstFlow)
{
@@ -157,7 +157,7 @@ namespace DotPulsar.Internal
ack.MessageIds.Add(messagePackage.MessageId);
- await Send(ack, cancellationToken);
+ await Send(ack, cancellationToken).ConfigureAwait(false);
}
}
}
diff --git a/src/DotPulsar/Internal/ConsumerChannelFactory.cs b/src/DotPulsar/Internal/ConsumerChannelFactory.cs
index 9746c67..aa07a40 100644
--- a/src/DotPulsar/Internal/ConsumerChannelFactory.cs
+++ b/src/DotPulsar/Internal/ConsumerChannelFactory.cs
@@ -58,14 +58,14 @@ namespace DotPulsar.Internal
}
public async Task<IConsumerChannel> Create(CancellationToken cancellationToken)
- => await _executor.Execute(() => GetChannel(cancellationToken), cancellationToken);
+ => await _executor.Execute(() => GetChannel(cancellationToken), cancellationToken).ConfigureAwait(false);
private async ValueTask<IConsumerChannel> GetChannel(CancellationToken cancellationToken)
{
- var connection = await _connectionPool.FindConnectionForTopic(_subscribe.Topic, cancellationToken);
+ var connection = await _connectionPool.FindConnectionForTopic(_subscribe.Topic, cancellationToken).ConfigureAwait(false);
var messageQueue = new AsyncQueue<MessagePackage>();
var channel = new Channel(_correlationId, _eventRegister, messageQueue);
- var response = await connection.Send(_subscribe, channel, cancellationToken);
+ var response = await connection.Send(_subscribe, channel, cancellationToken).ConfigureAwait(false);
return new ConsumerChannel(response.ConsumerId, _messagePrefetchCount, messageQueue, connection, _batchHandler);
}
}
diff --git a/src/DotPulsar/Internal/ConsumerProcess.cs b/src/DotPulsar/Internal/ConsumerProcess.cs
index ccc1169..af75d0f 100644
--- a/src/DotPulsar/Internal/ConsumerProcess.cs
+++ b/src/DotPulsar/Internal/ConsumerProcess.cs
@@ -42,7 +42,7 @@ namespace DotPulsar.Internal
{
_stateManager.SetState(ConsumerState.Closed);
CancellationTokenSource.Cancel();
- await _consumer.DisposeAsync();
+ await _consumer.DisposeAsync().ConfigureAwait(false);
}
protected override void CalculateState()
@@ -88,13 +88,13 @@ namespace DotPulsar.Internal
try
{
- channel = await _factory.Create(CancellationTokenSource.Token);
+ channel = await _factory.Create(CancellationTokenSource.Token).ConfigureAwait(false);
_consumer.SetChannel(channel);
}
catch
{
if (channel != null)
- await channel.DisposeAsync();
+ await channel.DisposeAsync().ConfigureAwait(false);
}
}
}
diff --git a/src/DotPulsar/Internal/DefaultExceptionHandler.cs b/src/DotPulsar/Internal/DefaultExceptionHandler.cs
index bf182b2..4c8b4f0 100644
--- a/src/DotPulsar/Internal/DefaultExceptionHandler.cs
+++ b/src/DotPulsar/Internal/DefaultExceptionHandler.cs
@@ -32,7 +32,7 @@ namespace DotPulsar.Internal
{
exceptionContext.Result = DetermineFaultAction(exceptionContext.Exception, exceptionContext.CancellationToken);
if (exceptionContext.Result == FaultAction.Retry)
- await Task.Delay(_retryInterval, exceptionContext.CancellationToken);
+ await Task.Delay(_retryInterval, exceptionContext.CancellationToken).ConfigureAwait(false);
exceptionContext.ExceptionHandled = true;
}
diff --git a/src/DotPulsar/Internal/ExceptionHandlerPipeline.cs b/src/DotPulsar/Internal/ExceptionHandlerPipeline.cs
index 11f825a..ebf164c 100644
--- a/src/DotPulsar/Internal/ExceptionHandlerPipeline.cs
+++ b/src/DotPulsar/Internal/ExceptionHandlerPipeline.cs
@@ -29,7 +29,7 @@ namespace DotPulsar.Internal
{
foreach (var handler in _handlers)
{
- await handler.OnException(exceptionContext);
+ await handler.OnException(exceptionContext).ConfigureAwait(false);
if (exceptionContext.ExceptionHandled)
break;
}
diff --git a/src/DotPulsar/Internal/Executor.cs b/src/DotPulsar/Internal/Executor.cs
index 1260cf3..f5ba80b 100644
--- a/src/DotPulsar/Internal/Executor.cs
+++ b/src/DotPulsar/Internal/Executor.cs
@@ -45,7 +45,7 @@ namespace DotPulsar.Internal
}
catch (Exception exception)
{
- if (await Handle(exception, cancellationToken))
+ if (await Handle(exception, cancellationToken).ConfigureAwait(false))
throw;
}
@@ -59,12 +59,12 @@ namespace DotPulsar.Internal
{
try
{
- await func();
+ await func().ConfigureAwait(false);
return;
}
catch (Exception exception)
{
- if (await Handle(exception, cancellationToken))
+ if (await Handle(exception, cancellationToken).ConfigureAwait(false))
throw;
}
@@ -78,12 +78,12 @@ namespace DotPulsar.Internal
{
try
{
- await func();
+ await func().ConfigureAwait(false);
return;
}
catch (Exception exception)
{
- if (await Handle(exception, cancellationToken))
+ if (await Handle(exception, cancellationToken).ConfigureAwait(false))
throw;
}
@@ -101,7 +101,7 @@ namespace DotPulsar.Internal
}
catch (Exception exception)
{
- if (await Handle(exception, cancellationToken))
+ if (await Handle(exception, cancellationToken).ConfigureAwait(false))
throw;
}
@@ -115,11 +115,11 @@ namespace DotPulsar.Internal
{
try
{
- return await func();
+ return await func().ConfigureAwait(false);
}
catch (Exception exception)
{
- if (await Handle(exception, cancellationToken))
+ if (await Handle(exception, cancellationToken).ConfigureAwait(false))
throw;
}
@@ -133,11 +133,11 @@ namespace DotPulsar.Internal
{
try
{
- return await func();
+ return await func().ConfigureAwait(false);
}
catch (Exception exception)
{
- if (await Handle(exception, cancellationToken))
+ if (await Handle(exception, cancellationToken).ConfigureAwait(false))
throw;
}
@@ -152,7 +152,7 @@ namespace DotPulsar.Internal
var context = new ExceptionContext(exception, cancellationToken);
- await _exceptionHandler.OnException(context);
+ await _exceptionHandler.OnException(context).ConfigureAwait(false);
if (context.Result != FaultAction.Retry)
_eventRegister.Register(new ExecutorFaulted(_correlationId));
diff --git a/src/DotPulsar/Internal/MessageBuilder.cs b/src/DotPulsar/Internal/MessageBuilder.cs
index 5f716d8..a5da955 100644
--- a/src/DotPulsar/Internal/MessageBuilder.cs
+++ b/src/DotPulsar/Internal/MessageBuilder.cs
@@ -86,6 +86,6 @@ namespace DotPulsar.Internal
}
public async ValueTask<MessageId> Send(ReadOnlyMemory<byte> data, CancellationToken cancellationToken)
- => await _producer.Send(_metadata, data, cancellationToken);
+ => await _producer.Send(_metadata, data, cancellationToken).ConfigureAwait(false);
}
}
diff --git a/src/DotPulsar/Internal/ProcessManager.cs b/src/DotPulsar/Internal/ProcessManager.cs
index d665033..0e8c7df 100644
--- a/src/DotPulsar/Internal/ProcessManager.cs
+++ b/src/DotPulsar/Internal/ProcessManager.cs
@@ -37,9 +37,9 @@ namespace DotPulsar.Internal
var processes = _processes.Values.ToArray();
for (var i = 0; i < processes.Length; ++i)
- await processes[i].DisposeAsync();
+ await processes[i].DisposeAsync().ConfigureAwait(false);
- await _connectionPool.DisposeAsync();
+ await _connectionPool.DisposeAsync().ConfigureAwait(false);
}
public void Add(IProcess process) => _processes[process.CorrelationId] = process;
@@ -47,7 +47,7 @@ namespace DotPulsar.Internal
private async void Remove(Guid correlationId)
{
if (_processes.TryRemove(correlationId, out IProcess process))
- await process.DisposeAsync();
+ await process.DisposeAsync().ConfigureAwait(false);
}
public void Register(IEvent e)
diff --git a/src/DotPulsar/Internal/Producer.cs b/src/DotPulsar/Internal/Producer.cs
index 9362a57..2f94e13 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -50,10 +50,10 @@ namespace DotPulsar.Internal
}
public async ValueTask<ProducerState> StateChangedTo(ProducerState state, CancellationToken cancellationToken)
- => await _state.StateChangedTo(state, cancellationToken);
+ => await _state.StateChangedTo(state, cancellationToken).ConfigureAwait(false);
public async ValueTask<ProducerState> StateChangedFrom(ProducerState state, CancellationToken cancellationToken)
- => await _state.StateChangedFrom(state, cancellationToken);
+ => await _state.StateChangedFrom(state, cancellationToken).ConfigureAwait(false);
public bool IsFinalState() => _state.IsFinalState();
@@ -65,32 +65,32 @@ namespace DotPulsar.Internal
return;
_eventRegister.Register(new ProducerDisposed(_correlationId, this));
- await _channel.DisposeAsync();
+ await _channel.DisposeAsync().ConfigureAwait(false);
}
public async ValueTask<MessageId> Send(byte[] data, CancellationToken cancellationToken)
- => await Send(new ReadOnlySequence<byte>(data), cancellationToken);
+ => await Send(new ReadOnlySequence<byte>(data), cancellationToken).ConfigureAwait(false);
public async ValueTask<MessageId> Send(ReadOnlyMemory<byte> data, CancellationToken cancellationToken)
- => await Send(new ReadOnlySequence<byte>(data), cancellationToken);
+ => await Send(new ReadOnlySequence<byte>(data), cancellationToken).ConfigureAwait(false);
public async ValueTask<MessageId> Send(ReadOnlySequence<byte> data, CancellationToken cancellationToken)
{
ThrowIfDisposed();
- var response = await _executor.Execute(() => _channel.Send(data, cancellationToken), cancellationToken);
+ var response = await _executor.Execute(() => _channel.Send(data, cancellationToken), cancellationToken).ConfigureAwait(false);
return new MessageId(response.MessageId);
}
public async ValueTask<MessageId> Send(MessageMetadata metadata, byte[] data, CancellationToken cancellationToken)
- => await Send(metadata, new ReadOnlySequence<byte>(data), cancellationToken);
+ => await Send(metadata, new ReadOnlySequence<byte>(data), cancellationToken).ConfigureAwait(false);
public async ValueTask<MessageId> Send(MessageMetadata metadata, ReadOnlyMemory<byte> data, CancellationToken cancellationToken)
- => await Send(metadata, new ReadOnlySequence<byte>(data), cancellationToken);
+ => await Send(metadata, new ReadOnlySequence<byte>(data), cancellationToken).ConfigureAwait(false);
public async ValueTask<MessageId> Send(MessageMetadata metadata, ReadOnlySequence<byte> data, CancellationToken cancellationToken)
{
ThrowIfDisposed();
- var response = await _executor.Execute(() => _channel.Send(metadata.Metadata, data, cancellationToken), cancellationToken);
+ var response = await _executor.Execute(() => _channel.Send(metadata.Metadata, data, cancellationToken), cancellationToken).ConfigureAwait(false);
return new MessageId(response.MessageId);
}
diff --git a/src/DotPulsar/Internal/ProducerChannel.cs b/src/DotPulsar/Internal/ProducerChannel.cs
index 49b70a9..0a9dff9 100644
--- a/src/DotPulsar/Internal/ProducerChannel.cs
+++ b/src/DotPulsar/Internal/ProducerChannel.cs
@@ -54,7 +54,7 @@ namespace DotPulsar.Internal
{
try
{
- await _connection.Send(new CommandCloseProducer { ProducerId = _id }, CancellationToken.None);
+ await _connection.Send(new CommandCloseProducer { ProducerId = _id }, CancellationToken.None).ConfigureAwait(false);
}
catch
{
@@ -66,7 +66,7 @@ namespace DotPulsar.Internal
{
_cachedSendPackage.Metadata = _cachedMetadata;
_cachedSendPackage.Payload = payload;
- return await SendPackage(true, cancellationToken);
+ return await SendPackage(true, cancellationToken).ConfigureAwait(false);
}
public async Task<CommandSendReceipt> Send(PulsarApi.MessageMetadata metadata, ReadOnlySequence<byte> payload, CancellationToken cancellationToken)
@@ -74,7 +74,7 @@ namespace DotPulsar.Internal
metadata.ProducerName = _cachedMetadata.ProducerName;
_cachedSendPackage.Metadata = metadata;
_cachedSendPackage.Payload = payload;
- return await SendPackage(metadata.SequenceId == 0, cancellationToken);
+ return await SendPackage(metadata.SequenceId == 0, cancellationToken).ConfigureAwait(false);
}
private async Task<CommandSendReceipt> SendPackage(bool autoAssignSequenceId, CancellationToken cancellationToken)
@@ -91,7 +91,7 @@ namespace DotPulsar.Internal
else
_cachedSendPackage.Command.SequenceId = _cachedSendPackage.Metadata.SequenceId;
- var response = await _connection.Send(_cachedSendPackage, cancellationToken);
+ var response = await _connection.Send(_cachedSendPackage, cancellationToken).ConfigureAwait(false);
response.Expect(BaseCommand.Type.SendReceipt);
if (autoAssignSequenceId)
diff --git a/src/DotPulsar/Internal/ProducerChannelFactory.cs b/src/DotPulsar/Internal/ProducerChannelFactory.cs
index 7426c22..ae8c15e 100644
--- a/src/DotPulsar/Internal/ProducerChannelFactory.cs
+++ b/src/DotPulsar/Internal/ProducerChannelFactory.cs
@@ -50,13 +50,13 @@ namespace DotPulsar.Internal
}
public async Task<IProducerChannel> Create(CancellationToken cancellationToken)
- => await _executor.Execute(() => GetChannel(cancellationToken), cancellationToken);
+ => await _executor.Execute(() => GetChannel(cancellationToken), cancellationToken).ConfigureAwait(false);
private async ValueTask<IProducerChannel> GetChannel(CancellationToken cancellationToken)
{
- var connection = await _connectionPool.FindConnectionForTopic(_commandProducer.Topic, cancellationToken);
+ var connection = await _connectionPool.FindConnectionForTopic(_commandProducer.Topic, cancellationToken).ConfigureAwait(false);
var channel = new Channel(_correlationId, _eventRegister, new AsyncQueue<MessagePackage>());
- var response = await connection.Send(_commandProducer, channel, cancellationToken);
+ var response = await connection.Send(_commandProducer, channel, cancellationToken).ConfigureAwait(false);
return new ProducerChannel(response.ProducerId, response.ProducerName, _sequenceId, connection);
}
}
diff --git a/src/DotPulsar/Internal/ProducerProcess.cs b/src/DotPulsar/Internal/ProducerProcess.cs
index ff50d83..a4e7d52 100644
--- a/src/DotPulsar/Internal/ProducerProcess.cs
+++ b/src/DotPulsar/Internal/ProducerProcess.cs
@@ -39,7 +39,7 @@ namespace DotPulsar.Internal
{
_stateManager.SetState(ProducerState.Closed);
CancellationTokenSource.Cancel();
- await _producer.DisposeAsync();
+ await _producer.DisposeAsync().ConfigureAwait(false);
}
protected override void CalculateState()
@@ -72,13 +72,13 @@ namespace DotPulsar.Internal
try
{
- channel = await _factory.Create(CancellationTokenSource.Token);
+ channel = await _factory.Create(CancellationTokenSource.Token).ConfigureAwait(false);
_producer.SetChannel(channel);
}
catch
{
if (channel != null)
- await channel.DisposeAsync();
+ await channel.DisposeAsync().ConfigureAwait(false);
}
}
}
diff --git a/src/DotPulsar/Internal/PulsarStream.cs b/src/DotPulsar/Internal/PulsarStream.cs
index 99eef2f..6c7b330 100644
--- a/src/DotPulsar/Internal/PulsarStream.cs
+++ b/src/DotPulsar/Internal/PulsarStream.cs
@@ -53,12 +53,12 @@ namespace DotPulsar.Internal
foreach (var segment in sequence)
{
var data = segment.ToArray();
- await _stream.WriteAsync(data, 0, data.Length);
+ await _stream.WriteAsync(data, 0, data.Length).ConfigureAwait(false);
}
#else
foreach (var segment in sequence)
{
- await _stream.WriteAsync(segment);
+ await _stream.WriteAsync(segment).ConfigureAwait(false);
}
#endif
}
@@ -73,7 +73,7 @@ namespace DotPulsar.Internal
#if NETSTANDARD2_0
_stream.Dispose();
#else
- await _stream.DisposeAsync();
+ await _stream.DisposeAsync().ConfigureAwait(false);
#endif
}
@@ -90,17 +90,17 @@ namespace DotPulsar.Internal
{
var memory = _writer.GetMemory(84999); // LOH - 1 byte
#if NETSTANDARD2_0
- var bytesRead = await _stream.ReadAsync(buffer, 0, buffer.Length, cancellationToken);
+ var bytesRead = await _stream.ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false);
new Memory<byte>(buffer, 0, bytesRead).CopyTo(memory);
#else
- var bytesRead = await _stream.ReadAsync(memory, cancellationToken);
+ var bytesRead = await _stream.ReadAsync(memory, cancellationToken).ConfigureAwait(false);
#endif
if (bytesRead == 0)
break;
_writer.Advance(bytesRead);
- var result = await _writer.FlushAsync(cancellationToken);
+ var result = await _writer.FlushAsync(cancellationToken).ConfigureAwait(false);
if (result.IsCompleted)
break;
}
@@ -122,7 +122,7 @@ namespace DotPulsar.Internal
{
while (true)
{
- var result = await _reader.ReadAsync(cancellationToken);
+ var result = await _reader.ReadAsync(cancellationToken).ConfigureAwait(false);
var buffer = result.Buffer;
while (true)
diff --git a/src/DotPulsar/Internal/Reader.cs b/src/DotPulsar/Internal/Reader.cs
index 4804d67..6c7d1c5 100644
--- a/src/DotPulsar/Internal/Reader.cs
+++ b/src/DotPulsar/Internal/Reader.cs
@@ -51,10 +51,10 @@ namespace DotPulsar.Internal
}
public async ValueTask<ReaderState> StateChangedTo(ReaderState state, CancellationToken cancellationToken)
- => await _state.StateChangedTo(state, cancellationToken);
+ => await _state.StateChangedTo(state, cancellationToken).ConfigureAwait(false);
public async ValueTask<ReaderState> StateChangedFrom(ReaderState state, CancellationToken cancellationToken)
- => await _state.StateChangedFrom(state, cancellationToken);
+ => await _state.StateChangedFrom(state, cancellationToken).ConfigureAwait(false);
public bool IsFinalState() => _state.IsFinalState();
@@ -66,7 +66,7 @@ namespace DotPulsar.Internal
while (!cancellationToken.IsCancellationRequested)
{
- yield return await _executor.Execute(() => _channel.Receive(cancellationToken), cancellationToken);
+ yield return await _executor.Execute(() => _channel.Receive(cancellationToken), cancellationToken).ConfigureAwait(false);
}
}
@@ -76,7 +76,7 @@ namespace DotPulsar.Internal
return;
_eventRegister.Register(new ReaderDisposed(_correlationId, this));
- await _channel.DisposeAsync();
+ await _channel.DisposeAsync().ConfigureAwait(false);
}
internal void SetChannel(IReaderChannel channel)
diff --git a/src/DotPulsar/Internal/ReaderChannelFactory.cs b/src/DotPulsar/Internal/ReaderChannelFactory.cs
index e140854..d37319f 100644
--- a/src/DotPulsar/Internal/ReaderChannelFactory.cs
+++ b/src/DotPulsar/Internal/ReaderChannelFactory.cs
@@ -57,14 +57,14 @@ namespace DotPulsar.Internal
}
public async Task<IReaderChannel> Create(CancellationToken cancellationToken)
- => await _executor.Execute(() => GetChannel(cancellationToken), cancellationToken);
+ => await _executor.Execute(() => GetChannel(cancellationToken), cancellationToken).ConfigureAwait(false);
private async ValueTask<IReaderChannel> GetChannel(CancellationToken cancellationToken)
{
- var connection = await _connectionPool.FindConnectionForTopic(_subscribe.Topic, cancellationToken);
+ var connection = await _connectionPool.FindConnectionForTopic(_subscribe.Topic, cancellationToken).ConfigureAwait(false);
var messageQueue = new AsyncQueue<MessagePackage>();
var channel = new Channel(_correlationId, _eventRegister, messageQueue);
- var response = await connection.Send(_subscribe, channel, cancellationToken);
+ var response = await connection.Send(_subscribe, channel, cancellationToken).ConfigureAwait(false);
return new ConsumerChannel(response.ConsumerId, _messagePrefetchCount, messageQueue, connection, _batchHandler);
}
}
diff --git a/src/DotPulsar/Internal/ReaderProcess.cs b/src/DotPulsar/Internal/ReaderProcess.cs
index b51f585..3a7ec7d 100644
--- a/src/DotPulsar/Internal/ReaderProcess.cs
+++ b/src/DotPulsar/Internal/ReaderProcess.cs
@@ -39,7 +39,7 @@ namespace DotPulsar.Internal
{
_stateManager.SetState(ReaderState.Closed);
CancellationTokenSource.Cancel();
- await _reader.DisposeAsync();
+ await _reader.DisposeAsync().ConfigureAwait(false);
}
protected override void CalculateState()
@@ -75,13 +75,13 @@ namespace DotPulsar.Internal
try
{
- channel = await _factory.Create(CancellationTokenSource.Token);
+ channel = await _factory.Create(CancellationTokenSource.Token).ConfigureAwait(false);
_reader.SetChannel(channel);
}
catch
{
if (channel != null)
- await channel.DisposeAsync();
+ await channel.DisposeAsync().ConfigureAwait(false);
}
}
}
diff --git a/src/DotPulsar/PulsarClient.cs b/src/DotPulsar/PulsarClient.cs
index 21e4aaf..ad5f0e5 100644
--- a/src/DotPulsar/PulsarClient.cs
+++ b/src/DotPulsar/PulsarClient.cs
@@ -88,7 +88,7 @@ namespace DotPulsar
return;
if (_processManager is IAsyncDisposable disposable)
- await disposable.DisposeAsync();
+ await disposable.DisposeAsync().ConfigureAwait(false);
DotPulsarEventSource.Log.ClientDisposed();
}