You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bl...@apache.org on 2020/11/11 11:45:09 UTC

[pulsar-dotpulsar] branch master updated: Minor performance improvements. Moving to .NET 5. Start using FluentAssertions.

This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 27891d0  Minor performance improvements. Moving to .NET 5. Start using FluentAssertions.
27891d0 is described below

commit 27891d0a6e12ceee1b0f46f72ca29227f973a944
Author: Daniel Blankensteiner <db...@vmail.dk>
AuthorDate: Wed Nov 11 12:44:53 2020 +0100

    Minor performance improvements. Moving to .NET 5. Start using FluentAssertions.
---
 samples/Consuming/Consuming.csproj                 |  2 +-
 samples/Producing/Producing.csproj                 |  4 +-
 samples/Reading/Reading.csproj                     |  4 +-
 src/Directory.Build.props                          |  2 +-
 src/DotPulsar/DotPulsar.csproj                     |  8 ++--
 src/DotPulsar/Internal/AsyncLock.cs                |  4 +-
 src/DotPulsar/Internal/AsyncQueue.cs               |  8 ++--
 src/DotPulsar/Internal/Awaiter.cs                  |  2 +-
 src/DotPulsar/Internal/Connection.cs               |  3 +-
 src/DotPulsar/Internal/ConnectionPool.cs           |  4 +-
 src/DotPulsar/Internal/Connector.cs                |  4 +-
 src/DotPulsar/Internal/Constants.cs                |  7 ++-
 src/DotPulsar/Internal/Consumer.cs                 |  2 +-
 src/DotPulsar/Internal/ConsumerChannel.cs          |  2 +-
 src/DotPulsar/Internal/IdLookup.cs                 |  6 +--
 src/DotPulsar/Internal/Producer.cs                 |  2 +-
 src/DotPulsar/Internal/PulsarStream.cs             |  2 +-
 src/DotPulsar/Internal/Reader.cs                   |  2 +-
 src/DotPulsar/Internal/RequestResponseHandler.cs   |  4 +-
 src/DotPulsar/Internal/SequenceBuilder.cs          | 25 +++++------
 src/DotPulsar/Internal/StateTaskCollection.cs      |  2 +-
 src/DotPulsar/Message.cs                           |  6 +--
 src/DotPulsar/MessageId.cs                         |  8 ++--
 tests/Directory.Build.props                        |  2 +-
 tests/DotPulsar.StressTests/ConsumerTests.cs       | 50 +++++++++++----------
 .../DotPulsar.StressTests.csproj                   |  4 +-
 .../EnumerableTaskExtensions.cs                    | 28 ++++++------
 .../Fixtures/StandaloneClusterFixture.cs           |  2 +
 tests/DotPulsar.Tests/DotPulsar.Tests.csproj       |  3 +-
 tests/DotPulsar.Tests/Internal/AsyncLockTests.cs   | 15 ++++---
 tests/DotPulsar.Tests/Internal/AsyncQueueTests.cs  | 37 ++++++++--------
 tests/DotPulsar.Tests/Internal/Crc32CTests.cs      |  6 +--
 .../Extensions/ReadOnlySequenceExtensionsTests.cs  | 21 ++++-----
 .../Internal/SequenceBuilderTests.cs               | 51 ++++++++++++++--------
 tests/DotPulsar.Tests/Internal/SerializerTests.cs  |  4 +-
 .../DotPulsar.Tests/Internal/StateManagerTests.cs  | 25 ++++++-----
 tests/DotPulsar.Tests/MessageIdTests.cs            | 33 +++++++-------
 tests/docker-compose-standalone-tests.yml          |  2 +-
 38 files changed, 212 insertions(+), 184 deletions(-)

diff --git a/samples/Consuming/Consuming.csproj b/samples/Consuming/Consuming.csproj
index 435ebe2..6e05a4d 100644
--- a/samples/Consuming/Consuming.csproj
+++ b/samples/Consuming/Consuming.csproj
@@ -2,7 +2,7 @@
 
   <PropertyGroup>
     <OutputType>Exe</OutputType>
-    <TargetFramework>netcoreapp3.1</TargetFramework>
+    <TargetFramework>net5.0</TargetFramework>
   </PropertyGroup>
 
   <ItemGroup>
diff --git a/samples/Producing/Producing.csproj b/samples/Producing/Producing.csproj
index 364dcdb..6e05a4d 100644
--- a/samples/Producing/Producing.csproj
+++ b/samples/Producing/Producing.csproj
@@ -1,8 +1,8 @@
-<Project Sdk="Microsoft.NET.Sdk">
+<Project Sdk="Microsoft.NET.Sdk">
 
   <PropertyGroup>
     <OutputType>Exe</OutputType>
-    <TargetFramework>netcoreapp3.1</TargetFramework>
+    <TargetFramework>net5.0</TargetFramework>
   </PropertyGroup>
 
   <ItemGroup>
diff --git a/samples/Reading/Reading.csproj b/samples/Reading/Reading.csproj
index 364dcdb..6e05a4d 100644
--- a/samples/Reading/Reading.csproj
+++ b/samples/Reading/Reading.csproj
@@ -1,8 +1,8 @@
-<Project Sdk="Microsoft.NET.Sdk">
+<Project Sdk="Microsoft.NET.Sdk">
 
   <PropertyGroup>
     <OutputType>Exe</OutputType>
-    <TargetFramework>netcoreapp3.1</TargetFramework>
+    <TargetFramework>net5.0</TargetFramework>
   </PropertyGroup>
 
   <ItemGroup>
diff --git a/src/Directory.Build.props b/src/Directory.Build.props
index 9ff9438..7254a3d 100644
--- a/src/Directory.Build.props
+++ b/src/Directory.Build.props
@@ -1,7 +1,7 @@
 <Project>
 
   <PropertyGroup>
-    <LangVersion>8.0</LangVersion>
+    <LangVersion>9.0</LangVersion>
     <Nullable>enable</Nullable>
   </PropertyGroup>
 
diff --git a/src/DotPulsar/DotPulsar.csproj b/src/DotPulsar/DotPulsar.csproj
index 5ca67b5..6f1a40a 100644
--- a/src/DotPulsar/DotPulsar.csproj
+++ b/src/DotPulsar/DotPulsar.csproj
@@ -1,7 +1,7 @@
 <Project Sdk="Microsoft.NET.Sdk">
 
   <PropertyGroup>
-    <TargetFrameworks>netstandard2.0;netstandard2.1</TargetFrameworks>
+    <TargetFrameworks>netstandard2.0;netstandard2.1;netcoreapp3.1;net5.0</TargetFrameworks>
     <Version>0.9.6</Version>
     <AssemblyVersion>$(Version)</AssemblyVersion>
     <FileVersion>$(Version)</FileVersion>
@@ -21,14 +21,14 @@
   </PropertyGroup>
 
   <ItemGroup>    
-    <PackageReference Include="Microsoft.Extensions.ObjectPool" Version="3.1.9" />    
+    <PackageReference Include="Microsoft.Extensions.ObjectPool" Version="5.0.0" />    
     <PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0" PrivateAssets="All" />
     <PackageReference Include="protobuf-net" Version="2.4.6" />
-    <PackageReference Include="System.IO.Pipelines" Version="4.7.3" />
+    <PackageReference Include="System.IO.Pipelines" Version="5.0.0" />
   </ItemGroup>
 
   <ItemGroup Condition="'$(TargetFramework)' == 'netstandard2.0'">
-    <PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="1.1.1" />
+    <PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="5.0.0" />
     <PackageReference Include="Microsoft.Bcl.HashCode" Version="1.1.0" />
   </ItemGroup>
 
diff --git a/src/DotPulsar/Internal/AsyncLock.cs b/src/DotPulsar/Internal/AsyncLock.cs
index 2e42b92..432eb37 100644
--- a/src/DotPulsar/Internal/AsyncLock.cs
+++ b/src/DotPulsar/Internal/AsyncLock.cs
@@ -99,9 +99,9 @@ namespace DotPulsar.Internal
         {
             lock (_pending)
             {
-                if (_pending.Count > 0)
+                var node = _pending.First;
+                if (node is not null)
                 {
-                    var node = _pending.First;
                     node.Value.SetResult(_releaser);
                     node.Value.Dispose();
                     _pending.RemoveFirst();
diff --git a/src/DotPulsar/Internal/AsyncQueue.cs b/src/DotPulsar/Internal/AsyncQueue.cs
index 6225138..5c22c43 100644
--- a/src/DotPulsar/Internal/AsyncQueue.cs
+++ b/src/DotPulsar/Internal/AsyncQueue.cs
@@ -41,12 +41,12 @@ namespace DotPulsar.Internal
             {
                 ThrowIfDisposed();
 
-                if (_pendingDequeues.Count > 0)
+                var node = _pendingDequeues.First;
+                if (node is not null)
                 {
-                    var tcs = _pendingDequeues.First;
+                    node.Value.SetResult(item);
+                    node.Value.Dispose();
                     _pendingDequeues.RemoveFirst();
-                    tcs.Value.SetResult(item);
-                    tcs.Value.Dispose();
                 }
                 else
                     _queue.Enqueue(item);
diff --git a/src/DotPulsar/Internal/Awaiter.cs b/src/DotPulsar/Internal/Awaiter.cs
index 9f7a8c4..ebbd254 100644
--- a/src/DotPulsar/Internal/Awaiter.cs
+++ b/src/DotPulsar/Internal/Awaiter.cs
@@ -18,7 +18,7 @@ namespace DotPulsar.Internal
     using System.Collections.Concurrent;
     using System.Threading.Tasks;
 
-    public sealed class Awaiter<T, TResult> : IDisposable
+    public sealed class Awaiter<T, TResult> : IDisposable where T : notnull
     {
         private readonly ConcurrentDictionary<T, TaskCompletionSource<TResult>> _items;
 
diff --git a/src/DotPulsar/Internal/Connection.cs b/src/DotPulsar/Internal/Connection.cs
index d444586..7829173 100644
--- a/src/DotPulsar/Internal/Connection.cs
+++ b/src/DotPulsar/Internal/Connection.cs
@@ -19,6 +19,7 @@ namespace DotPulsar.Internal
     using Extensions;
     using PulsarApi;
     using System;
+    using System.Buffers;
     using System.Threading;
     using System.Threading.Tasks;
 
@@ -231,7 +232,7 @@ namespace DotPulsar.Internal
                     switch (command.CommandType)
                     {
                         case BaseCommand.Type.Message:
-                            _channelManager.Incoming(command.Message, frame.Slice(commandSize + 4));
+                            _channelManager.Incoming(command.Message, new ReadOnlySequence<byte>(frame.Slice(commandSize + 4).ToArray()));
                             break;
                         case BaseCommand.Type.CloseConsumer:
                             _channelManager.Incoming(command.CloseConsumer);
diff --git a/src/DotPulsar/Internal/ConnectionPool.cs b/src/DotPulsar/Internal/ConnectionPool.cs
index 8f05343..c394cfc 100644
--- a/src/DotPulsar/Internal/ConnectionPool.cs
+++ b/src/DotPulsar/Internal/ConnectionPool.cs
@@ -141,7 +141,7 @@ namespace DotPulsar.Internal
         {
             using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
             {
-                if (_connections.TryGetValue(url, out Connection connection))
+                if (_connections.TryGetValue(url, out Connection? connection) && connection is not null)
                     return connection;
 
                 return await EstablishNewConnection(url, cancellationToken).ConfigureAwait(false);
@@ -167,7 +167,7 @@ namespace DotPulsar.Internal
 
         private async ValueTask DisposeConnection(PulsarUrl serviceUrl)
         {
-            if (_connections.TryRemove(serviceUrl, out Connection connection))
+            if (_connections.TryRemove(serviceUrl, out Connection? connection) && connection is not null)
             {
                 await connection.DisposeAsync().ConfigureAwait(false);
                 DotPulsarEventSource.Log.ConnectionDisposed();
diff --git a/src/DotPulsar/Internal/Connector.cs b/src/DotPulsar/Internal/Connector.cs
index 203658f..edd856b 100644
--- a/src/DotPulsar/Internal/Connector.cs
+++ b/src/DotPulsar/Internal/Connector.cs
@@ -110,7 +110,7 @@ namespace DotPulsar.Internal
             }
         }
 
-        private bool ValidateServerCertificate(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors)
+        private bool ValidateServerCertificate(object sender, X509Certificate? certificate, X509Chain? chain, SslPolicyErrors sslPolicyErrors)
         {
             if (sslPolicyErrors == SslPolicyErrors.None)
                 return true;
@@ -123,7 +123,7 @@ namespace DotPulsar.Internal
 
             if (sslPolicyErrors.HasFlag(SslPolicyErrors.RemoteCertificateChainErrors) && _verifyCertificateAuthority)
             {
-                if (_trustedCertificateAuthority is null)
+                if (_trustedCertificateAuthority is null || chain is null || certificate is null)
                     return false;
 
                 chain.ChainPolicy.ExtraStore.Add(_trustedCertificateAuthority);
diff --git a/src/DotPulsar/Internal/Constants.cs b/src/DotPulsar/Internal/Constants.cs
index 9c21fd2..abc1f75 100644
--- a/src/DotPulsar/Internal/Constants.cs
+++ b/src/DotPulsar/Internal/Constants.cs
@@ -14,6 +14,7 @@
 
 namespace DotPulsar.Internal
 {
+    using System;
     using System.Reflection;
 
     public static class Constants
@@ -21,7 +22,11 @@ namespace DotPulsar.Internal
         static Constants()
         {
             var assemblyName = Assembly.GetCallingAssembly().GetName();
-            ClientVersion = $"{assemblyName.Name} {assemblyName.Version.ToString(3)}";
+            var assemblyVersion = assemblyName.Version;
+            if (assemblyVersion is null)
+                throw new Exception("Assembly version of CallingAssembly is null");
+
+            ClientVersion = $"{assemblyName.Name} {assemblyVersion.ToString(3)}";
             ProtocolVersion = 14;
             PulsarScheme = "pulsar";
             PulsarSslScheme = "pulsar+ssl";
diff --git a/src/DotPulsar/Internal/Consumer.cs b/src/DotPulsar/Internal/Consumer.cs
index c6162d2..33e807f 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -188,7 +188,7 @@ namespace DotPulsar.Internal
             var oldChannel = _channel;
             _channel = channel;
 
-            if (oldChannel != null)
+            if (oldChannel is not null)
                 await oldChannel.DisposeAsync().ConfigureAwait(false);
         }
 
diff --git a/src/DotPulsar/Internal/ConsumerChannel.cs b/src/DotPulsar/Internal/ConsumerChannel.cs
index 7ea2255..2dd2325 100644
--- a/src/DotPulsar/Internal/ConsumerChannel.cs
+++ b/src/DotPulsar/Internal/ConsumerChannel.cs
@@ -69,7 +69,7 @@ namespace DotPulsar.Internal
 
                     var message = _batchHandler.GetNext();
 
-                    if (message != null)
+                    if (message is not null)
                         return message;
 
                     var messagePackage = await _queue.Dequeue(cancellationToken).ConfigureAwait(false);
diff --git a/src/DotPulsar/Internal/IdLookup.cs b/src/DotPulsar/Internal/IdLookup.cs
index 85c7ee0..8b674e1 100644
--- a/src/DotPulsar/Internal/IdLookup.cs
+++ b/src/DotPulsar/Internal/IdLookup.cs
@@ -29,7 +29,7 @@ namespace DotPulsar.Internal
             {
                 for (var i = 0; i < _items.Length; ++i)
                 {
-                    if (_items[i] != null)
+                    if (_items[i] is not null)
                         return false;
                 }
 
@@ -43,7 +43,7 @@ namespace DotPulsar.Internal
             {
                 for (var i = 0; i < _items.Length; ++i)
                 {
-                    if (_items[i] != null)
+                    if (_items[i] is not null)
                         continue;
 
                     _items[i] = item;
@@ -79,7 +79,7 @@ namespace DotPulsar.Internal
                 {
                     var item = _items[i];
 
-                    if (item != null)
+                    if (item is not null)
                     {
                         items.Add(item);
                         _items[i] = null;
diff --git a/src/DotPulsar/Internal/Producer.cs b/src/DotPulsar/Internal/Producer.cs
index 8f4ed1a..9e81810 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -147,7 +147,7 @@ namespace DotPulsar.Internal
             var oldChannel = _channel;
             _channel = channel;
 
-            if (oldChannel != null)
+            if (oldChannel is not null)
                 await oldChannel.DisposeAsync().ConfigureAwait(false);
         }
 
diff --git a/src/DotPulsar/Internal/PulsarStream.cs b/src/DotPulsar/Internal/PulsarStream.cs
index 6f820cf..6b0f598 100644
--- a/src/DotPulsar/Internal/PulsarStream.cs
+++ b/src/DotPulsar/Internal/PulsarStream.cs
@@ -140,7 +140,7 @@ namespace DotPulsar.Internal
                         if (buffer.Length < totalSize)
                             break;
 
-                        yield return new ReadOnlySequence<byte>(buffer.Slice(4, frameSize).ToArray());
+                        yield return buffer.Slice(4, frameSize);
 
                         buffer = buffer.Slice(totalSize);
                     }
diff --git a/src/DotPulsar/Internal/Reader.cs b/src/DotPulsar/Internal/Reader.cs
index 03f4e4f..82f0e91 100644
--- a/src/DotPulsar/Internal/Reader.cs
+++ b/src/DotPulsar/Internal/Reader.cs
@@ -100,7 +100,7 @@ namespace DotPulsar.Internal
             var oldChannel = _channel;
             _channel = channel;
 
-            if (oldChannel != null)
+            if (oldChannel is not null)
                 await oldChannel.DisposeAsync().ConfigureAwait(false);
         }
 
diff --git a/src/DotPulsar/Internal/RequestResponseHandler.cs b/src/DotPulsar/Internal/RequestResponseHandler.cs
index 1885aa4..42e62df 100644
--- a/src/DotPulsar/Internal/RequestResponseHandler.cs
+++ b/src/DotPulsar/Internal/RequestResponseHandler.cs
@@ -23,7 +23,7 @@ namespace DotPulsar.Internal
         private const string ConnectResponseIdentifier = "Connected";
 
         private readonly Awaiter<string, BaseCommand> _responses;
-        private RequestId _requestId;
+        private readonly RequestId _requestId;
 
         public RequestResponseHandler()
         {
@@ -44,7 +44,7 @@ namespace DotPulsar.Internal
         {
             var identifier = GetResponseIdentifier(command);
 
-            if (identifier != null)
+            if (identifier is not null)
                 _responses.SetResult(identifier, command);
         }
 
diff --git a/src/DotPulsar/Internal/SequenceBuilder.cs b/src/DotPulsar/Internal/SequenceBuilder.cs
index 4ad0a41..b2433ea 100644
--- a/src/DotPulsar/Internal/SequenceBuilder.cs
+++ b/src/DotPulsar/Internal/SequenceBuilder.cs
@@ -64,26 +64,23 @@ namespace DotPulsar.Internal
 
         public ReadOnlySequence<T> Build()
         {
-            if (_elements.Count == 0)
+            var node = _elements.First;
+            if (node is null)
                 return new ReadOnlySequence<T>();
 
-            Segment? start = null;
-            Segment? current = null;
+            var current = new Segment(node.Value);
+            var start = current;
 
-            foreach (var element in _elements)
+            while (true)
             {
-                if (current is null)
-                {
-                    current = new Segment(element);
-                    start = current;
-                }
-                else
-                {
-                    current = current.CreateNext(element);
-                }
+                node = node.Next;
+                if (node is null)
+                    break;
+
+                current = current.CreateNext(node.Value);
             }
 
-            return new ReadOnlySequence<T>(start, 0, current, current!.Memory.Length);
+            return new ReadOnlySequence<T>(start, 0, current, current.Memory.Length);
         }
 
         private sealed class Segment : ReadOnlySequenceSegment<T>
diff --git a/src/DotPulsar/Internal/StateTaskCollection.cs b/src/DotPulsar/Internal/StateTaskCollection.cs
index 745777a..5bfac23 100644
--- a/src/DotPulsar/Internal/StateTaskCollection.cs
+++ b/src/DotPulsar/Internal/StateTaskCollection.cs
@@ -49,7 +49,7 @@ namespace DotPulsar.Internal
             {
                 var awaiter = _awaiters.First;
 
-                while (awaiter != null)
+                while (awaiter is not null)
                 {
                     var next = awaiter.Next;
 
diff --git a/src/DotPulsar/Message.cs b/src/DotPulsar/Message.cs
index afeb39c..c921711 100644
--- a/src/DotPulsar/Message.cs
+++ b/src/DotPulsar/Message.cs
@@ -109,7 +109,7 @@ namespace DotPulsar
         /// <summary>
         /// Check whether the message has a key.
         /// </summary>
-        public bool HasKey => Key != null;
+        public bool HasKey => Key is not null;
 
         /// <summary>
         /// The key as a string.
@@ -119,12 +119,12 @@ namespace DotPulsar
         /// <summary>
         /// The key as bytes.
         /// </summary>
-        public byte[]? KeyBytes => HasBase64EncodedKey ? Convert.FromBase64String(Key) : null;
+        public byte[]? KeyBytes => Key is not null ? Convert.FromBase64String(Key) : null;
 
         /// <summary>
         /// Check whether the message has an ordering key.
         /// </summary>
-        public bool HasOrderingKey => OrderingKey != null;
+        public bool HasOrderingKey => OrderingKey is not null;
 
         /// <summary>
         /// The ordering key of the message.
diff --git a/src/DotPulsar/MessageId.cs b/src/DotPulsar/MessageId.cs
index 55ab235..27efc62 100644
--- a/src/DotPulsar/MessageId.cs
+++ b/src/DotPulsar/MessageId.cs
@@ -72,14 +72,14 @@ namespace DotPulsar
         /// </summary>
         public int BatchIndex => Data.BatchIndex;
 
-        public override bool Equals(object o)
+        public override bool Equals(object? o)
             => o is MessageId id && Equals(id);
 
-        public bool Equals(MessageId other)
-            => !(other is null) && LedgerId == other.LedgerId && EntryId == other.EntryId && Partition == other.Partition && BatchIndex == other.BatchIndex;
+        public bool Equals(MessageId? other)
+            => other is not null && LedgerId == other.LedgerId && EntryId == other.EntryId && Partition == other.Partition && BatchIndex == other.BatchIndex;
 
         public static bool operator ==(MessageId x, MessageId y)
-            => ReferenceEquals(x, y) || (x is object && x.Equals(y));
+            => ReferenceEquals(x, y) || (x is not null && x.Equals(y));
 
         public static bool operator !=(MessageId x, MessageId y)
             => !(x == y);
diff --git a/tests/Directory.Build.props b/tests/Directory.Build.props
index 9ff9438..7254a3d 100644
--- a/tests/Directory.Build.props
+++ b/tests/Directory.Build.props
@@ -1,7 +1,7 @@
 <Project>
 
   <PropertyGroup>
-    <LangVersion>8.0</LangVersion>
+    <LangVersion>9.0</LangVersion>
     <Nullable>enable</Nullable>
   </PropertyGroup>
 
diff --git a/tests/DotPulsar.StressTests/ConsumerTests.cs b/tests/DotPulsar.StressTests/ConsumerTests.cs
index 6dd648f..fa32611 100644
--- a/tests/DotPulsar.StressTests/ConsumerTests.cs
+++ b/tests/DotPulsar.StressTests/ConsumerTests.cs
@@ -14,12 +14,12 @@
 
 namespace DotPulsar.StressTests
 {
+    using DotPulsar.Abstractions;
     using Extensions;
     using Fixtures;
     using FluentAssertions;
     using System;
     using System.Collections.Generic;
-    using System.Linq;
     using System.Text;
     using System.Threading;
     using System.Threading.Tasks;
@@ -46,10 +46,11 @@ namespace DotPulsar.StressTests
             await using var client = PulsarClient.Builder()
                 .ExceptionHandler(new XunitExceptionHandler(_output))
                 .ServiceUrl(new Uri("pulsar://localhost:54545"))
-                .Build(); //Connecting to pulsar://localhost:6650
+                .Build();
 
             await using var consumer = client.NewConsumer()
                 .ConsumerName($"consumer-{testRunId}")
+                .InitialPosition(SubscriptionInitialPosition.Earliest)
                 .SubscriptionName($"subscription-{testRunId}")
                 .Topic(topic)
                 .Create();
@@ -62,38 +63,43 @@ namespace DotPulsar.StressTests
             var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60));
 
             //Act
-            var consume = ConsumeMessages(cts.Token);
-            var produce = ProduceMessages(cts.Token);
-
-            var consumed = await consume.ConfigureAwait(false);
-            var produced = await produce.ConfigureAwait(false);
+            var produced = await ProduceMessages(producer, numberOfMessages, cts.Token);
+            var consumed = await ConsumeMessages(consumer, numberOfMessages, cts.Token);
 
             //Assert
             consumed.Should().BeEquivalentTo(produced);
+        }
 
-            Task<MessageId[]> ProduceMessages(CancellationToken ct)
-                => Enumerable.Range(1, numberOfMessages)
-                    .Select(async n => await producer.Send(Encoding.UTF8.GetBytes($"Sent #{n} at {DateTimeOffset.UtcNow:s}"), ct).ConfigureAwait(false))
-                    .WhenAll();
+        private async Task<IEnumerable<MessageId>> ProduceMessages(IProducer producer, int numberOfMessages, CancellationToken ct)
+        {
+            var messageIds = new MessageId[numberOfMessages];
 
-            async Task<List<MessageId>> ConsumeMessages(CancellationToken ct)
+            for (var i = 0; i < numberOfMessages; ++i)
             {
-                var ids = new List<MessageId>(numberOfMessages);
+                var data = Encoding.UTF8.GetBytes($"Sent #{i} at {DateTimeOffset.UtcNow:s}");
+                messageIds[i] = await producer.Send(data, ct);
+            }
 
-                await foreach (var message in consumer.Messages(ct))
-                {
-                    ids.Add(message.MessageId);
+            return messageIds;
+        }
 
-                    if (ids.Count != numberOfMessages)
-                        continue;
+        private async Task<IEnumerable<MessageId>> ConsumeMessages(IConsumer consumer, int numberOfMessages, CancellationToken ct)
+        {
+            var messageIds = new List<MessageId>(numberOfMessages);
+
+            await foreach (var message in consumer.Messages(ct))
+            {
+                messageIds.Add(message.MessageId);
 
-                    await consumer.AcknowledgeCumulative(message, ct).ConfigureAwait(false);
+                if (messageIds.Count != numberOfMessages)
+                    continue;
 
-                    break;
-                }
+                await consumer.AcknowledgeCumulative(message, ct);
 
-                return ids;
+                break;
             }
+
+            return messageIds;
         }
     }
 }
diff --git a/tests/DotPulsar.StressTests/DotPulsar.StressTests.csproj b/tests/DotPulsar.StressTests/DotPulsar.StressTests.csproj
index 89365a4..5650dd6 100644
--- a/tests/DotPulsar.StressTests/DotPulsar.StressTests.csproj
+++ b/tests/DotPulsar.StressTests/DotPulsar.StressTests.csproj
@@ -1,7 +1,7 @@
-<Project Sdk="Microsoft.NET.Sdk">
+<Project Sdk="Microsoft.NET.Sdk">
 
   <PropertyGroup>
-    <TargetFrameworks>netcoreapp3.0;netcoreapp22</TargetFrameworks>
+    <TargetFramework>net5.0</TargetFramework>
     <IsPackable>false</IsPackable>
   </PropertyGroup>
 
diff --git a/tests/DotPulsar.StressTests/EnumerableTaskExtensions.cs b/tests/DotPulsar.StressTests/EnumerableTaskExtensions.cs
index 000b08d..7a7d23a 100644
--- a/tests/DotPulsar.StressTests/EnumerableTaskExtensions.cs
+++ b/tests/DotPulsar.StressTests/EnumerableTaskExtensions.cs
@@ -25,62 +25,60 @@ namespace DotPulsar.StressTests
     public static class EnumerableValueTaskExtensions
     {
         [DebuggerStepThrough]
-        public static async ValueTask<TResult[]> WhenAll<TResult>(this IEnumerable<ValueTask<TResult>> source)
+        public static async ValueTask<TResult[]> WhenAll<TResult>(this IEnumerable<ValueTask<TResult>> source) where TResult : notnull
         {
             // the volatile property IsCompleted must be accessed only once
             var tasks = source.Select(GetInfo).ToArray();
 
             // run incomplete tasks
-            await tasks
-                .Where(x => x.IsTask).Select(t => t.Task)
-                .WhenAll()
-                .ConfigureAwait(false);
+            foreach (var task in tasks)
+            {
+                if (task.Task is not null && !task.Task.IsCompleted)
+                    await task.Task.ConfigureAwait(false);
+            }
 
             // return ordered mixed tasks \m/
             return tasks
-                .Select(x => x.IsTask ? x.Task.Result : x.Result)
+                .Select(x => x.Task is not null ? x.Task.Result : x.Result)
                 .ToArray();
         }
 
         [DebuggerStepThrough]
-        public static async Task<TResult[]> WhenAllAsTask<TResult>(this IEnumerable<ValueTask<TResult>> source)
+        public static async Task<TResult[]> WhenAllAsTask<TResult>(this IEnumerable<ValueTask<TResult>> source) where TResult : notnull
             => await source.WhenAll().ConfigureAwait(false);
 
         [DebuggerStepThrough]
-        public static async IAsyncEnumerable<TResult> Enumerate<TResult>(this IEnumerable<ValueTask<TResult>> source)
+        public static async IAsyncEnumerable<TResult> Enumerate<TResult>(this IEnumerable<ValueTask<TResult>> source) where TResult : notnull
         {
             foreach (var operation in source.Select(GetInfo))
             {
-                yield return operation.IsTask
+                yield return operation.Task is not null
                     ? await operation.Task.ConfigureAwait(false)
                     : operation.Result;
             }
         }
 
-        private static ValueTaskInfo<TResult> GetInfo<TResult>(this ValueTask<TResult> source)
+        private static ValueTaskInfo<TResult> GetInfo<TResult>(this ValueTask<TResult> source) where TResult : notnull
             => source.IsCompleted
                 ? new ValueTaskInfo<TResult>(source.Result)
                 : new ValueTaskInfo<TResult>(source.AsTask());
 
-        private readonly struct ValueTaskInfo<TResult>
+        private readonly struct ValueTaskInfo<TResult> where TResult : notnull
         {
             public ValueTaskInfo(Task<TResult> task)
             {
-                IsTask = true;
                 Result = default;
                 Task = task;
             }
 
             public ValueTaskInfo(TResult result)
             {
-                IsTask = false;
                 Result = result;
                 Task = default;
             }
 
-            public bool IsTask { get; }
             public TResult Result { get; }
-            public Task<TResult> Task { get; }
+            public Task<TResult>? Task { get; }
         }
     }
 
diff --git a/tests/DotPulsar.StressTests/Fixtures/StandaloneClusterFixture.cs b/tests/DotPulsar.StressTests/Fixtures/StandaloneClusterFixture.cs
index 0f11c96..28b12f0 100644
--- a/tests/DotPulsar.StressTests/Fixtures/StandaloneClusterFixture.cs
+++ b/tests/DotPulsar.StressTests/Fixtures/StandaloneClusterFixture.cs
@@ -76,6 +76,8 @@ namespace DotPulsar.StressTests.Fixtures
             processStartInfo.Environment["COMPUTERNAME"] = Environment.MachineName;
 
             var process = Process.Start(processStartInfo);
+            if (process is null)
+                throw new Exception("Process.Start returned null");
 
             process.WaitForExit();
 
diff --git a/tests/DotPulsar.Tests/DotPulsar.Tests.csproj b/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
index e877906..a3e617c 100644
--- a/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
+++ b/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
@@ -1,11 +1,12 @@
 <Project Sdk="Microsoft.NET.Sdk">
 
   <PropertyGroup>
-    <TargetFrameworks>netcoreapp3.0;netcoreapp22</TargetFrameworks>
+    <TargetFramework>net5.0</TargetFramework>
     <IsPackable>false</IsPackable>
   </PropertyGroup>
 
   <ItemGroup>
+    <PackageReference Include="FluentAssertions" Version="5.10.3" />
     <PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.8.0" />
     <PackageReference Include="xunit" Version="2.4.1" />
     <PackageReference Include="xunit.runner.visualstudio" Version="2.4.3">
diff --git a/tests/DotPulsar.Tests/Internal/AsyncLockTests.cs b/tests/DotPulsar.Tests/Internal/AsyncLockTests.cs
index 3a3d8af..bbd8594 100644
--- a/tests/DotPulsar.Tests/Internal/AsyncLockTests.cs
+++ b/tests/DotPulsar.Tests/Internal/AsyncLockTests.cs
@@ -16,6 +16,7 @@ namespace DotPulsar.Tests.Internal
 {
     using DotPulsar.Internal;
     using DotPulsar.Internal.Exceptions;
+    using FluentAssertions;
     using System.Threading;
     using System.Threading.Tasks;
     using Xunit;
@@ -32,7 +33,7 @@ namespace DotPulsar.Tests.Internal
             var actual = sut.Lock(CancellationToken.None);
 
             //Assert
-            Assert.True(actual.IsCompleted);
+            actual.IsCompleted.Should().BeTrue();
 
             //Annihilate 
             actual.Result.Dispose();
@@ -50,7 +51,7 @@ namespace DotPulsar.Tests.Internal
             var actual = sut.Lock(CancellationToken.None);
 
             //Assert
-            Assert.False(actual.IsCompleted);
+            actual.IsCompleted.Should().BeFalse();
 
             //Annihilate
             alreadyTaken.Dispose();
@@ -69,7 +70,7 @@ namespace DotPulsar.Tests.Internal
             var exception = await Record.ExceptionAsync(() => sut.Lock(CancellationToken.None)).ConfigureAwait(false);
 
             //Assert
-            Assert.IsType<AsyncLockDisposedException>(exception);
+            exception.Should().BeOfType<AsyncLockDisposedException>();
         }
 
         [Fact]
@@ -85,7 +86,7 @@ namespace DotPulsar.Tests.Internal
             var exception = await Record.ExceptionAsync(() => awaiting).ConfigureAwait(false);
 
             //Assert
-            Assert.IsType<TaskCanceledException>(exception);
+            exception.Should().BeOfType<TaskCanceledException>();
 
             //Annihilate
             await sut.DisposeAsync().ConfigureAwait(false);
@@ -106,7 +107,7 @@ namespace DotPulsar.Tests.Internal
             var exception = await Record.ExceptionAsync(() => awaiting).ConfigureAwait(false);
 
             //Assert
-            Assert.IsType<TaskCanceledException>(exception);
+            exception.Should().BeOfType<TaskCanceledException>();
 
             //Annihilate
             cts.Dispose();
@@ -128,7 +129,7 @@ namespace DotPulsar.Tests.Internal
             await disposeTask.ConfigureAwait(false);
 
             //Assert
-            Assert.True(disposeTask.IsCompleted);
+            disposeTask.IsCompleted.Should().BeTrue();
 
             //Annihilate
             await sut.DisposeAsync().ConfigureAwait(false);
@@ -145,7 +146,7 @@ namespace DotPulsar.Tests.Internal
             var exception = await Record.ExceptionAsync(() => sut.DisposeAsync().AsTask()).ConfigureAwait(false); // xUnit can't record ValueTask yet
 
             //Assert
-            Assert.Null(exception);
+            exception.Should().BeNull();
         }
     }
 }
diff --git a/tests/DotPulsar.Tests/Internal/AsyncQueueTests.cs b/tests/DotPulsar.Tests/Internal/AsyncQueueTests.cs
index b15fed2..8263594 100644
--- a/tests/DotPulsar.Tests/Internal/AsyncQueueTests.cs
+++ b/tests/DotPulsar.Tests/Internal/AsyncQueueTests.cs
@@ -15,6 +15,7 @@
 namespace DotPulsar.Tests.Internal
 {
     using DotPulsar.Internal;
+    using FluentAssertions;
     using System.Threading;
     using System.Threading.Tasks;
     using Xunit;
@@ -25,16 +26,16 @@ namespace DotPulsar.Tests.Internal
         public async Task Enqueue_GivenDequeueTaskWasWaiting_ShouldCompleteDequeueTask()
         {
             //Arrange
-            const int value = 1;
+            const int expected = 1;
             var queue = new AsyncQueue<int>();
             var dequeueTask = queue.Dequeue();
-            queue.Enqueue(value);
+            queue.Enqueue(expected);
 
             //Act
             var actual = await dequeueTask.ConfigureAwait(false);
 
             //Assert
-            Assert.Equal(value, actual);
+            actual.Should().Be(expected);
 
             //Annihilate
             queue.Dispose();
@@ -44,15 +45,15 @@ namespace DotPulsar.Tests.Internal
         public async Task DequeueAsync_GivenQueueWasNotEmpty_ShouldCompleteDequeueTask()
         {
             //Arrange
-            const int value = 1;
+            const int expected = 1;
             var queue = new AsyncQueue<int>();
-            queue.Enqueue(value);
+            queue.Enqueue(expected);
 
             //Act
             var actual = await queue.Dequeue().ConfigureAwait(false);
 
             //Assert
-            Assert.Equal(value, actual);
+            actual.Should().Be(expected);
 
             //Annihilate
             queue.Dispose();
@@ -62,20 +63,20 @@ namespace DotPulsar.Tests.Internal
         public async Task DequeueAsync_GivenMultipleDequeues_ShouldCompleteInOrderedSequence()
         {
             //Arrange
-            const int value1 = 1, value2 = 2;
+            const int expected1 = 1, expected2 = 2;
             var queue = new AsyncQueue<int>();
             var dequeue1 = queue.Dequeue();
             var dequeue2 = queue.Dequeue();
-            queue.Enqueue(value1);
-            queue.Enqueue(value2);
+            queue.Enqueue(expected1);
+            queue.Enqueue(expected2);
 
             //Act
             var actual1 = await dequeue1.ConfigureAwait(false);
             var actual2 = await dequeue2.ConfigureAwait(false);
 
             //Assert
-            Assert.Equal(value1, actual1);
-            Assert.Equal(value2, actual2);
+            actual1.Should().Be(expected1);
+            actual2.Should().Be(expected2);
 
             //Annihilate
             queue.Dispose();
@@ -85,18 +86,18 @@ namespace DotPulsar.Tests.Internal
         public async Task DequeueAsync_GivenSequenceOfInput_ShouldReturnSameSequenceOfOutput()
         {
             //Arrange
-            const int value1 = 1, value2 = 2;
+            const int expected1 = 1, expected2 = 2;
             var queue = new AsyncQueue<int>();
-            queue.Enqueue(value1);
-            queue.Enqueue(value2);
+            queue.Enqueue(expected1);
+            queue.Enqueue(expected2);
 
             //Act
             var actual1 = await queue.Dequeue().ConfigureAwait(false);
             var actual2 = await queue.Dequeue().ConfigureAwait(false);
 
             //Assert
-            Assert.Equal(value1, actual1);
-            Assert.Equal(value2, actual2);
+            actual1.Should().Be(expected1);
+            actual2.Should().Be(expected2);
 
             //Annihilate
             queue.Dispose();
@@ -119,8 +120,8 @@ namespace DotPulsar.Tests.Internal
             await task2.ConfigureAwait(false);
 
             //Assert
-            Assert.IsType<TaskCanceledException>(exception);
-            Assert.Equal(excepted, task2.Result);
+            exception.Should().BeOfType<TaskCanceledException>();
+            task2.Result.Should().Be(excepted);
 
             //Annihilate
             source1.Dispose();
diff --git a/tests/DotPulsar.Tests/Internal/Crc32CTests.cs b/tests/DotPulsar.Tests/Internal/Crc32CTests.cs
index b3d5fd5..63ca663 100644
--- a/tests/DotPulsar.Tests/Internal/Crc32CTests.cs
+++ b/tests/DotPulsar.Tests/Internal/Crc32CTests.cs
@@ -15,6 +15,7 @@
 namespace DotPulsar.Tests.Internal
 {
     using DotPulsar.Internal;
+    using FluentAssertions;
     using Xunit;
 
     public class Crc32CTests
@@ -32,7 +33,7 @@ namespace DotPulsar.Tests.Internal
 
             //Assert
             const uint expected = 2355953212;
-            Assert.Equal(expected, actual);
+            actual.Should().Be(expected);
         }
 
         [Fact]
@@ -55,8 +56,7 @@ namespace DotPulsar.Tests.Internal
 
             //Assert
             const uint expected = 1079987866;
-
-            Assert.Equal(expected, actual);
+            actual.Should().Be(expected);
         }
     }
 }
diff --git a/tests/DotPulsar.Tests/Internal/Extensions/ReadOnlySequenceExtensionsTests.cs b/tests/DotPulsar.Tests/Internal/Extensions/ReadOnlySequenceExtensionsTests.cs
index 91f2c7b..b84268c 100644
--- a/tests/DotPulsar.Tests/Internal/Extensions/ReadOnlySequenceExtensionsTests.cs
+++ b/tests/DotPulsar.Tests/Internal/Extensions/ReadOnlySequenceExtensionsTests.cs
@@ -16,6 +16,7 @@ namespace DotPulsar.Tests.Internal.Extensions
 {
     using DotPulsar.Internal;
     using DotPulsar.Internal.Extensions;
+    using FluentAssertions;
     using Xunit;
 
     public class ReadOnlySequenceExtensionsTests
@@ -30,7 +31,7 @@ namespace DotPulsar.Tests.Internal.Extensions
             var actual = sequence.StartsWith(new byte[] { 0x00, 0x01 });
 
             //Assert
-            Assert.False(actual);
+            actual.Should().BeFalse();
         }
 
         [Fact]
@@ -43,7 +44,7 @@ namespace DotPulsar.Tests.Internal.Extensions
             var actual = sequence.StartsWith(new byte[] { 0x00, 0x01 });
 
             //Assert
-            Assert.False(actual);
+            actual.Should().BeFalse();
         }
 
         [Fact]
@@ -56,7 +57,7 @@ namespace DotPulsar.Tests.Internal.Extensions
             var actual = sequence.StartsWith(new byte[] { 0x00, 0x01 });
 
             //Assert
-            Assert.True(actual);
+            actual.Should().BeTrue();
         }
 
         [Fact]
@@ -69,7 +70,7 @@ namespace DotPulsar.Tests.Internal.Extensions
             var actual = sequence.StartsWith(new byte[] { 0x00, 0x01, 0x02, 0x03 });
 
             //Assert
-            Assert.False(actual);
+            actual.Should().BeFalse();
         }
 
         [Fact]
@@ -82,7 +83,7 @@ namespace DotPulsar.Tests.Internal.Extensions
             var actual = sequence.StartsWith(new byte[] { 0x00, 0x01, 0x02 });
 
             //Assert
-            Assert.False(actual);
+            actual.Should().BeFalse();
         }
 
         [Fact]
@@ -95,7 +96,7 @@ namespace DotPulsar.Tests.Internal.Extensions
             var actual = sequence.StartsWith(new byte[] { 0x00, 0x01, 0x02 });
 
             //Assert
-            Assert.True(actual);
+            actual.Should().BeTrue();
         }
 
         [Fact]
@@ -109,7 +110,7 @@ namespace DotPulsar.Tests.Internal.Extensions
 
             //Assert
             const uint expected = 66051;
-            Assert.Equal(expected, actual);
+            actual.Should().Be(expected);
         }
 
         [Fact]
@@ -123,7 +124,7 @@ namespace DotPulsar.Tests.Internal.Extensions
 
             //Assert
             const uint expected = 66051;
-            Assert.Equal(expected, actual);
+            actual.Should().Be(expected);
         }
 
         [Fact]
@@ -137,7 +138,7 @@ namespace DotPulsar.Tests.Internal.Extensions
 
             //Assert
             const uint expected = 66051;
-            Assert.Equal(expected, actual);
+            actual.Should().Be(expected);
         }
 
         [Fact]
@@ -154,7 +155,7 @@ namespace DotPulsar.Tests.Internal.Extensions
 
             //Assert
             const uint expected = 66051;
-            Assert.Equal(expected, actual);
+            actual.Should().Be(expected);
         }
     }
 }
diff --git a/tests/DotPulsar.Tests/Internal/SequenceBuilderTests.cs b/tests/DotPulsar.Tests/Internal/SequenceBuilderTests.cs
index f8c649c..00555c1 100644
--- a/tests/DotPulsar.Tests/Internal/SequenceBuilderTests.cs
+++ b/tests/DotPulsar.Tests/Internal/SequenceBuilderTests.cs
@@ -15,7 +15,9 @@
 namespace DotPulsar.Tests.Internal
 {
     using DotPulsar.Internal;
+    using FluentAssertions;
     using System.Buffers;
+    using System.Linq;
     using Xunit;
 
     public class SequenceBuilderTests
@@ -31,13 +33,11 @@ namespace DotPulsar.Tests.Internal
             var builder = new SequenceBuilder<byte>().Append(a).Append(b).Append(c);
 
             //Act
-            var sequence = builder.Build();
+            var actual = builder.Build().ToArray();
 
             //Assert
-            var array = sequence.ToArray();
-
-            for (byte i = 0; i < array.Length; ++i)
-                Assert.Equal(i, array[i]);
+            var expected = Enumerable.Range(0, 10).ToArray();
+            actual.Should().Equal(expected);
         }
 
         [Fact]
@@ -54,13 +54,11 @@ namespace DotPulsar.Tests.Internal
             var builder = new SequenceBuilder<byte>().Append(a).Append(seq).Append(e);
 
             //Act
-            var sequence = builder.Build();
+            var actual = builder.Build().ToArray();
 
             //Assert
-            var array = sequence.ToArray();
-
-            for (byte i = 0; i < array.Length; ++i)
-                Assert.Equal(i, array[i]);
+            var expected = Enumerable.Range(0, 10).ToArray();
+            actual.Should().Equal(expected);
         }
 
         [Fact]
@@ -74,13 +72,11 @@ namespace DotPulsar.Tests.Internal
             var builder = new SequenceBuilder<byte>().Prepend(c).Prepend(b).Prepend(a);
 
             //Act
-            var sequence = builder.Build();
+            var actual = builder.Build().ToArray();
 
             //Assert
-            var array = sequence.ToArray();
-
-            for (byte i = 0; i < array.Length; ++i)
-                Assert.Equal(i, array[i]);
+            var expected = Enumerable.Range(0, 10).ToArray();
+            actual.Should().Equal(expected);
         }
 
         [Fact]
@@ -97,13 +93,30 @@ namespace DotPulsar.Tests.Internal
             var builder = new SequenceBuilder<byte>().Prepend(e).Prepend(seq).Prepend(a);
 
             //Act
-            var sequence = builder.Build();
+            var actual = builder.Build().ToArray();
 
             //Assert
-            var array = sequence.ToArray();
+            var expected = Enumerable.Range(0, 10).ToArray();
+            actual.Should().Equal(expected);
+        }
+
+        [Fact]
+        public void Build_GivenMultipleInvocations_ShouldCreateIdenticalSequences()
+        {
+            //Arrange
+            var a = new byte[] { 0x00, 0x01 };
+            var b = new byte[] { 0x02, 0x03 };
+
+            var builder = new SequenceBuilder<byte>().Append(a).Append(b);
 
-            for (byte i = 0; i < array.Length; ++i)
-                Assert.Equal(i, array[i]);
+            //Act
+            var actual1 = builder.Build().ToArray();
+            var actual2 = builder.Build().ToArray();
+
+            //Assert
+            var expected = Enumerable.Range(0, 4).ToArray();
+            actual1.Should().Equal(expected);
+            actual2.Should().Equal(expected);
         }
     }
 }
diff --git a/tests/DotPulsar.Tests/Internal/SerializerTests.cs b/tests/DotPulsar.Tests/Internal/SerializerTests.cs
index ac157f3..8968894 100644
--- a/tests/DotPulsar.Tests/Internal/SerializerTests.cs
+++ b/tests/DotPulsar.Tests/Internal/SerializerTests.cs
@@ -15,6 +15,7 @@
 namespace DotPulsar.Tests.Internal
 {
     using DotPulsar.Internal;
+    using FluentAssertions;
     using Xunit;
 
     public class SerializerTests
@@ -30,8 +31,7 @@ namespace DotPulsar.Tests.Internal
 
             //Assert
             var expected = new byte[] { 0x00, 0x01, 0x02, 0x03 };
-
-            Assert.Equal(expected, actual);
+            actual.Should().Equal(expected);
         }
     }
 }
diff --git a/tests/DotPulsar.Tests/Internal/StateManagerTests.cs b/tests/DotPulsar.Tests/Internal/StateManagerTests.cs
index b813fa4..c95ac38 100644
--- a/tests/DotPulsar.Tests/Internal/StateManagerTests.cs
+++ b/tests/DotPulsar.Tests/Internal/StateManagerTests.cs
@@ -15,6 +15,7 @@
 namespace DotPulsar.Tests.Internal
 {
     using DotPulsar.Internal;
+    using FluentAssertions;
     using System.Threading;
     using System.Threading.Tasks;
     using Xunit;
@@ -40,7 +41,7 @@ namespace DotPulsar.Tests.Internal
             var actual = sut.SetState(newState);
 
             //Assert
-            Assert.Equal(expected, actual);
+            actual.Should().Be(expected);
         }
 
         [Theory]
@@ -56,7 +57,7 @@ namespace DotPulsar.Tests.Internal
             _ = sut.SetState(newState);
 
             //Assert
-            Assert.Equal(ProducerState.Closed, sut.CurrentState);
+            sut.CurrentState.Should().Be(ProducerState.Closed);
         }
 
         [Theory]
@@ -74,7 +75,7 @@ namespace DotPulsar.Tests.Internal
             _ = sut.SetState(newState);
 
             //Assert
-            Assert.True(task.IsCompleted);
+            task.IsCompleted.Should().BeTrue();
         }
 
         [Theory]
@@ -92,7 +93,7 @@ namespace DotPulsar.Tests.Internal
             _ = sut.SetState(newState);
 
             //Assert
-            Assert.True(task.IsCompleted);
+            task.IsCompleted.Should().BeTrue();
         }
 
         [Theory]
@@ -125,7 +126,7 @@ namespace DotPulsar.Tests.Internal
             var task = sut.StateChangedTo(wantedState, default);
 
             //Assert
-            Assert.False(task.IsCompleted);
+            task.IsCompleted.Should().BeFalse();
         }
 
         [Theory]
@@ -140,7 +141,7 @@ namespace DotPulsar.Tests.Internal
             var task = sut.StateChangedTo(state, default);
 
             //Assert
-            Assert.True(task.IsCompleted);
+            task.IsCompleted.Should().BeTrue();
         }
 
         [Theory]
@@ -155,7 +156,7 @@ namespace DotPulsar.Tests.Internal
             var task = sut.StateChangedFrom(state, default);
 
             //Assert
-            Assert.False(task.IsCompleted);
+            task.IsCompleted.Should().BeFalse();
         }
 
         [Theory]
@@ -172,7 +173,7 @@ namespace DotPulsar.Tests.Internal
             var task = sut.StateChangedFrom(fromState, default);
 
             //Assert
-            Assert.True(task.IsCompleted);
+            task.IsCompleted.Should().BeTrue();
         }
 
         [Theory]
@@ -188,7 +189,7 @@ namespace DotPulsar.Tests.Internal
             var task = sut.StateChangedFrom(state, default);
 
             //Assert
-            Assert.True(task.IsCompleted);
+            task.IsCompleted.Should().BeTrue();
         }
 
         [Theory]
@@ -204,7 +205,7 @@ namespace DotPulsar.Tests.Internal
             _ = sut.SetState(ProducerState.Closed);
 
             //Assert
-            Assert.True(task.IsCompleted);
+            task.IsCompleted.Should().BeTrue();
         }
 
         [Theory]
@@ -220,7 +221,7 @@ namespace DotPulsar.Tests.Internal
             _ = sut.SetState(newState);
 
             //Assert
-            Assert.False(task.IsCompleted);
+            task.IsCompleted.Should().BeFalse();
         }
 
         [Fact]
@@ -236,7 +237,7 @@ namespace DotPulsar.Tests.Internal
             var exception = await Record.ExceptionAsync(() => task.AsTask()).ConfigureAwait(false); // xUnit can't record ValueTask yet
 
             //Assert
-            Assert.IsType<TaskCanceledException>(exception);
+            exception.Should().BeOfType<TaskCanceledException>();
 
             //Annihilate
             cts.Dispose();
diff --git a/tests/DotPulsar.Tests/MessageIdTests.cs b/tests/DotPulsar.Tests/MessageIdTests.cs
index 7b1652b..b3d1077 100644
--- a/tests/DotPulsar.Tests/MessageIdTests.cs
+++ b/tests/DotPulsar.Tests/MessageIdTests.cs
@@ -18,6 +18,7 @@
 namespace DotPulsar.Tests
 {
     using DotPulsar;
+    using FluentAssertions;
     using Xunit;
 
     public class MessageIdTests
@@ -28,9 +29,9 @@ namespace DotPulsar.Tests
             var m1 = new MessageId(1234, 5678, 9876, 5432);
             var m2 = m1;
 
-            Assert.True(m1.Equals(m2));
-            Assert.True(m1 == m2);
-            Assert.False(m1 != m2);
+            m1.Equals(m2).Should().BeTrue();
+            (m1 == m2).Should().BeTrue();
+            (m1 != m2).Should().BeFalse();
         }
 
         [Fact]
@@ -39,9 +40,9 @@ namespace DotPulsar.Tests
             var m1 = new MessageId(1234, 5678, 9876, 5432);
             var m2 = new MessageId(1234, 5678, 9876, 5432);
 
-            Assert.True(m1.Equals(m2));
-            Assert.True(m1 == m2);
-            Assert.False(m1 != m2);
+            m1.Equals(m2).Should().BeTrue();
+            (m1 == m2).Should().BeTrue();
+            (m1 != m2).Should().BeFalse();
         }
 
         [Fact]
@@ -50,9 +51,9 @@ namespace DotPulsar.Tests
             var m1 = new MessageId(1234, 5678, 9876, 5432);
             var m2 = new MessageId(9876, 6432, 1234, 6678);
 
-            Assert.False(m1.Equals(m2));
-            Assert.False(m1 == m2);
-            Assert.True(m1 != m2);
+            m1.Equals(m2).Should().BeFalse();
+            (m1 == m2).Should().BeFalse();
+            (m1 != m2).Should().BeTrue();
         }
 
         [Fact]
@@ -61,9 +62,9 @@ namespace DotPulsar.Tests
             MessageId m1 = null;
             MessageId m2 = null;
 
-            Assert.True(m1 == m2);
-            Assert.True(m1 == null);
-            Assert.False(m1 != m2);
+            (m1 == m2).Should().BeTrue();
+            (m1 == null).Should().BeTrue();
+            (m1 != m2).Should().BeFalse();
         }
 
         [Fact]
@@ -72,10 +73,10 @@ namespace DotPulsar.Tests
             var m1 = new MessageId(1234, 5678, 9876, 5432);
             MessageId m2 = null;
 
-            Assert.False(m1 == null);
-            Assert.False(m1 == m2);
-            Assert.False(m1.Equals(m2));
-            Assert.True(m1 != m2);
+            (m1 == null).Should().BeFalse();
+            (m1 == m2).Should().BeFalse();
+            m1.Equals(m2).Should().BeFalse();
+            (m1 != m2).Should().BeTrue();
         }
     }
 }
diff --git a/tests/docker-compose-standalone-tests.yml b/tests/docker-compose-standalone-tests.yml
index 06d0a06..5dde15b 100644
--- a/tests/docker-compose-standalone-tests.yml
+++ b/tests/docker-compose-standalone-tests.yml
@@ -4,7 +4,7 @@ services:
 
   pulsar:
     container_name: pulsar-stresstests
-    image: 'apachepulsar/pulsar:2.6.0'
+    image: 'apachepulsar/pulsar:2.6.1'
     ports:
       - '54546:8080'
       - '54545:6650'