You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bl...@apache.org on 2022/02/02 12:33:14 UTC

[pulsar-dotpulsar] branch master updated: Fixed the tests

This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6c60892  Fixed the tests
6c60892 is described below

commit 6c6089237bf59a9cbf99a88d0b0e87d0904a72cf
Author: Daniel Blankensteiner <db...@vmail.dk>
AuthorDate: Wed Feb 2 13:33:03 2022 +0100

    Fixed the tests
---
 DotPulsar.sln                                      |  13 +-
 samples/Consuming/Program.cs                       |   2 +-
 samples/Producing/Program.cs                       |   2 +-
 samples/Reading/Program.cs                         |   2 +-
 src/DotPulsar/Internal/Connection.cs               |   4 +-
 .../Abstraction/IPulsarService.cs                  |  43 -----
 .../DotPulsar.IntegrationTests.csproj              |   8 +-
 .../Fixtures/StandaloneClusterFixture.cs           |  45 -----
 .../Fixtures/StandaloneTokenClusterFixture.cs      |  99 -----------
 .../Fixtures/StandaloneTokenClusterTests.cs        |  20 ---
 tests/DotPulsar.IntegrationTests/ProducerTests.cs  |  34 ++--
 .../Services/PulsarServiceBase.cs                  |  68 --------
 .../Services/ServiceFactory.cs                     |  36 ----
 .../Services/StandaloneContainerService.cs         |  68 --------
 .../Services/StandaloneExternalService.cs          |  31 ----
 ...loneClusterTests.cs => StandaloneCollection.cs} |   6 +-
 .../StandaloneFixture.cs                           | 121 +++++++++++++
 .../TokenRefreshTests.cs                           | 191 ---------------------
 tests/DotPulsar.IntegrationTests/TokenTests.cs     | 162 +++++++++++++++++
 .../docker-compose-standalone-tests.yml            |  21 ---
 ...e-standalone-token-tests.yml => standalone.yml} |  24 +--
 .../DotPulsar.StressTests.csproj                   |   4 +-
 .../EnumerableTaskExtensions.cs                    |   4 +-
 .../Fixtures/StandaloneClusterFixture.cs           |  35 ++--
 .../DotPulsar.StressTests/XunitExceptionHandler.cs |   5 +-
 .../DotPulsar.TestHelpers.csproj                   |   9 +
 .../ProcessAsyncHelper.cs                          |  23 +--
 tests/DotPulsar.Tests/DotPulsar.Tests.csproj       |   2 +-
 28 files changed, 370 insertions(+), 712 deletions(-)

diff --git a/DotPulsar.sln b/DotPulsar.sln
index 9c4a0e3..f273d9e 100644
--- a/DotPulsar.sln
+++ b/DotPulsar.sln
@@ -1,7 +1,7 @@
 
 Microsoft Visual Studio Solution File, Format Version 12.00
-# Visual Studio Version 16
-VisualStudioVersion = 16.0.29209.62
+# Visual Studio Version 17
+VisualStudioVersion = 17.0.32112.339
 MinimumVisualStudioVersion = 10.0.40219.1
 Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotPulsar", "src\DotPulsar\DotPulsar.csproj", "{8D300FC3-2E35-4D23-B0DD-FEE9E153330A}"
 EndProject
@@ -26,7 +26,9 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution
 		README.md = README.md
 	EndProjectSection
 EndProject
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DotPulsar.IntegrationTests", "tests\DotPulsar.IntegrationTests\DotPulsar.IntegrationTests.csproj", "{B44E52DB-DB45-4E31-AA2C-68E5C52AFDEB}"
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotPulsar.IntegrationTests", "tests\DotPulsar.IntegrationTests\DotPulsar.IntegrationTests.csproj", "{B44E52DB-DB45-4E31-AA2C-68E5C52AFDEB}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DotPulsar.TestHelpers", "tests\DotPulsar.TestHelpers\DotPulsar.TestHelpers.csproj", "{F75D1A07-BBED-411C-A900-357790717D8F}"
 EndProject
 Global
 	GlobalSection(SolutionConfigurationPlatforms) = preSolution
@@ -62,6 +64,10 @@ Global
 		{B44E52DB-DB45-4E31-AA2C-68E5C52AFDEB}.Debug|Any CPU.Build.0 = Debug|Any CPU
 		{B44E52DB-DB45-4E31-AA2C-68E5C52AFDEB}.Release|Any CPU.ActiveCfg = Release|Any CPU
 		{B44E52DB-DB45-4E31-AA2C-68E5C52AFDEB}.Release|Any CPU.Build.0 = Release|Any CPU
+		{F75D1A07-BBED-411C-A900-357790717D8F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+		{F75D1A07-BBED-411C-A900-357790717D8F}.Debug|Any CPU.Build.0 = Debug|Any CPU
+		{F75D1A07-BBED-411C-A900-357790717D8F}.Release|Any CPU.ActiveCfg = Release|Any CPU
+		{F75D1A07-BBED-411C-A900-357790717D8F}.Release|Any CPU.Build.0 = Release|Any CPU
 	EndGlobalSection
 	GlobalSection(SolutionProperties) = preSolution
 		HideSolutionNode = FALSE
@@ -73,6 +79,7 @@ Global
 		{14934BED-A222-47B2-A58A-CFC4AAB89B49} = {E7106D0F-B255-4631-9FB8-734FC5748FA9}
 		{6D44683B-865C-4D15-9F0A-1A8441354589} = {E7106D0F-B255-4631-9FB8-734FC5748FA9}
 		{B44E52DB-DB45-4E31-AA2C-68E5C52AFDEB} = {E1C932A9-6D4C-4DDF-8922-BE7B71F12F1C}
+		{F75D1A07-BBED-411C-A900-357790717D8F} = {E1C932A9-6D4C-4DDF-8922-BE7B71F12F1C}
 	EndGlobalSection
 	GlobalSection(ExtensibilityGlobals) = postSolution
 		SolutionGuid = {88355922-E70A-4B73-B7F8-ABF8F2B59789}
diff --git a/samples/Consuming/Program.cs b/samples/Consuming/Program.cs
index ed2bb74..c16dda1 100644
--- a/samples/Consuming/Program.cs
+++ b/samples/Consuming/Program.cs
@@ -75,6 +75,6 @@ internal static class Program
         };
 
         var topic = stateChanged.Consumer.Topic;
-        Console.WriteLine($"The consumer for topic '{topic}' " + stateMessage);
+        Console.WriteLine($"The consumer for topic '{topic}' {stateMessage}");
     }
 }
diff --git a/samples/Producing/Program.cs b/samples/Producing/Program.cs
index 14749b7..0dce9f5 100644
--- a/samples/Producing/Program.cs
+++ b/samples/Producing/Program.cs
@@ -78,6 +78,6 @@ internal static class Program
         };
 
         var topic = stateChanged.Producer.Topic;
-        Console.WriteLine($"The producer for topic '{topic}' " + stateMessage);
+        Console.WriteLine($"The producer for topic '{topic}' {stateMessage}");
     }
 }
diff --git a/samples/Reading/Program.cs b/samples/Reading/Program.cs
index b76b1b8..d00ad5c 100644
--- a/samples/Reading/Program.cs
+++ b/samples/Reading/Program.cs
@@ -73,6 +73,6 @@ internal static class Program
         };
 
         var topic = stateChanged.Reader.Topic;
-        Console.WriteLine($"The reader for topic '{topic}' " + stateMessage);
+        Console.WriteLine($"The reader for topic '{topic}' {stateMessage}");
     }
 }
diff --git a/src/DotPulsar/Internal/Connection.cs b/src/DotPulsar/Internal/Connection.cs
index 5b35c7d..adb1d34 100644
--- a/src/DotPulsar/Internal/Connection.cs
+++ b/src/DotPulsar/Internal/Connection.cs
@@ -86,6 +86,8 @@ public sealed class Connection : IConnection
 
     private async Task Send(CommandAuthResponse command, CancellationToken cancellationToken)
     {
+        await Task.Yield();
+
         if (_authentication is not null)
         {
             if (command.Response is null)
@@ -308,7 +310,7 @@ public sealed class Connection : IConnection
                 if (command.CommandType == BaseCommand.Type.Message)
                     _channelManager.Incoming(command.Message, new ReadOnlySequence<byte>(frame.Slice(commandSize + 4).ToArray()));
                 else if (command.CommandType == BaseCommand.Type.AuthChallenge)
-                    await Send(new CommandAuthResponse(), cancellationToken).ConfigureAwait(false);
+                    _ = Send(new CommandAuthResponse(), cancellationToken).ConfigureAwait(false);
                 else
                     _channelManager.Incoming(command);
             }
diff --git a/tests/DotPulsar.IntegrationTests/Abstraction/IPulsarService.cs b/tests/DotPulsar.IntegrationTests/Abstraction/IPulsarService.cs
deleted file mode 100644
index 4c9332f..0000000
--- a/tests/DotPulsar.IntegrationTests/Abstraction/IPulsarService.cs
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace DotPulsar.IntegrationTests.Abstraction;
-
-using System;
-using System.Net.Http;
-using System.Threading.Tasks;
-using Xunit;
-
-/// <summary>
-/// Pulsar Service interface
-/// </summary>
-public interface IPulsarService : IAsyncLifetime
-{
-    /// <summary>
-    /// Get broker binary protocol uri
-    /// </summary>
-    Uri GetBrokerUri();
-
-    /// <summary>
-    /// Get broker rest uri
-    /// </summary>
-    Uri GetWebServiceUri();
-
-    /// <summary>
-    /// Create a partitioned topic
-    /// The format of the restTopic must be `{schema}/{tenant}/{namespace}/{topicName}`
-    /// For example, `persistent/public/default/test-topic`
-    /// </summary>
-    Task<HttpResponseMessage?> CreatePartitionedTopic(string restTopic, int numPartitions);
-}
diff --git a/tests/DotPulsar.IntegrationTests/DotPulsar.IntegrationTests.csproj b/tests/DotPulsar.IntegrationTests/DotPulsar.IntegrationTests.csproj
index 7ce98c4..e7335e5 100644
--- a/tests/DotPulsar.IntegrationTests/DotPulsar.IntegrationTests.csproj
+++ b/tests/DotPulsar.IntegrationTests/DotPulsar.IntegrationTests.csproj
@@ -14,7 +14,7 @@
             <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
             <PrivateAssets>all</PrivateAssets>
         </PackageReference>
-        <PackageReference Include="coverlet.collector" Version="3.1.0">
+        <PackageReference Include="coverlet.collector" Version="3.1.1">
             <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
             <PrivateAssets>all</PrivateAssets>
         </PackageReference>
@@ -22,13 +22,11 @@
 
     <ItemGroup>
       <ProjectReference Include="..\..\src\DotPulsar\DotPulsar.csproj" />
+      <ProjectReference Include="..\DotPulsar.TestHelpers\DotPulsar.TestHelpers.csproj" />
     </ItemGroup>
 
     <ItemGroup>
-      <None Update="docker-compose-standalone-tests.yml">
-        <CopyToOutputDirectory>Always</CopyToOutputDirectory>
-      </None>
-      <None Update="docker-compose-standalone-token-tests.yml">
+      <None Update="standalone.yml">
         <CopyToOutputDirectory>Always</CopyToOutputDirectory>
       </None>
       <None Update="appdata\my-secret.key">
diff --git a/tests/DotPulsar.IntegrationTests/Fixtures/StandaloneClusterFixture.cs b/tests/DotPulsar.IntegrationTests/Fixtures/StandaloneClusterFixture.cs
deleted file mode 100644
index eeb3090..0000000
--- a/tests/DotPulsar.IntegrationTests/Fixtures/StandaloneClusterFixture.cs
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace DotPulsar.IntegrationTests.Fixtures;
-
-using Abstraction;
-using Services;
-using System.Threading.Tasks;
-using Xunit;
-using Xunit.Abstractions;
-
-public class StandaloneClusterFixture : IAsyncLifetime
-{
-    private readonly IMessageSink _messageSink;
-
-    public StandaloneClusterFixture(IMessageSink messageSink)
-    {
-        _messageSink = messageSink;
-    }
-
-    public IPulsarService? PulsarService { private set; get; }
-
-    public async Task InitializeAsync()
-    {
-        PulsarService = ServiceFactory.CreatePulsarService(_messageSink);
-        await PulsarService.InitializeAsync();
-    }
-
-    public async Task DisposeAsync()
-    {
-        if (PulsarService != null)
-            await PulsarService.DisposeAsync();
-    }
-}
diff --git a/tests/DotPulsar.IntegrationTests/Fixtures/StandaloneTokenClusterFixture.cs b/tests/DotPulsar.IntegrationTests/Fixtures/StandaloneTokenClusterFixture.cs
deleted file mode 100644
index f2c7884..0000000
--- a/tests/DotPulsar.IntegrationTests/Fixtures/StandaloneTokenClusterFixture.cs
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace DotPulsar.IntegrationTests.Fixtures;
-
-using Abstraction;
-using Services;
-using System;
-using System.Net.Http;
-using System.Net.Http.Headers;
-using System.Threading.Tasks;
-using Xunit.Abstractions;
-using Xunit.Sdk;
-
-public class StandaloneTokenClusterFixture : PulsarServiceBase
-{
-    private readonly IMessageSink _messageSink;
-
-    public StandaloneTokenClusterFixture(IMessageSink messageSink) : base(messageSink)
-    {
-        _messageSink = messageSink;
-    }
-
-    public IPulsarService PulsarService => this;
-
-    public override async Task InitializeAsync()
-    {
-        await TakeDownPulsar(); // clean-up if anything was left running from previous run
-
-        await ProcessAsyncHelper
-            .ExecuteShellCommand("docker-compose", "-f docker-compose-standalone-token-tests.yml up -d")
-            .ThrowOnFailure();
-
-        var waitTries = 10;
-
-        using var handler = new HttpClientHandler { AllowAutoRedirect = true };
-
-        using var client = new HttpClient(handler);
-
-        var token = await GetAuthToken(false);
-
-        client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", token);
-
-        while (waitTries > 0)
-        {
-            try
-            {
-                await client.GetAsync($"{PulsarService.GetWebServiceUri()}/metrics/").ConfigureAwait(false);
-                return;
-            }
-            catch (Exception e)
-            {
-                _messageSink.OnMessage(new DiagnosticMessage("Error trying to fetch metrics: {0}", e));
-                waitTries--;
-                await Task.Delay(5000).ConfigureAwait(false);
-            }
-        }
-
-        throw new Exception("Unable to confirm Pulsar has initialized");
-    }
-
-    protected override async Task OnDispose()
-        => await TakeDownPulsar();
-
-    public override Uri GetBrokerUri() => new("pulsar://localhost:54547");
-
-    public override Uri GetWebServiceUri() => new("http://localhost:54548");
-
-    private Task TakeDownPulsar()
-        => ProcessAsyncHelper
-        .ExecuteShellCommand("docker-compose", "-f docker-compose-standalone-token-tests.yml down")
-        .LogFailure(s => MessageSink.OnMessage(new DiagnosticMessage("Error bringing down container: {0}", s)));
-
-    public static async Task<string> GetAuthToken(bool includeExpiry)
-    {
-        var arguments = "exec pulsar-tokens bin/pulsar tokens create --secret-key file:///appdata/my-secret.key --subject test-user";
-
-        if (includeExpiry)
-            arguments += " --expiry-time 10s";
-
-        var tokenCreateRequest = await ProcessAsyncHelper.ExecuteShellCommand("docker", arguments);
-
-        if (!tokenCreateRequest.Completed)
-            throw new InvalidOperationException($"Getting token from container failed: {tokenCreateRequest.Output}");
-
-        return tokenCreateRequest.Output;
-    }
-}
diff --git a/tests/DotPulsar.IntegrationTests/Fixtures/StandaloneTokenClusterTests.cs b/tests/DotPulsar.IntegrationTests/Fixtures/StandaloneTokenClusterTests.cs
deleted file mode 100644
index 612e79c..0000000
--- a/tests/DotPulsar.IntegrationTests/Fixtures/StandaloneTokenClusterTests.cs
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace DotPulsar.IntegrationTests.Fixtures;
-
-using Xunit;
-
-[CollectionDefinition(nameof(StandaloneTokenClusterTests))]
-public class StandaloneTokenClusterTests : ICollectionFixture<StandaloneTokenClusterFixture> { }
diff --git a/tests/DotPulsar.IntegrationTests/ProducerTests.cs b/tests/DotPulsar.IntegrationTests/ProducerTests.cs
index 6c1ac25..10b6a02 100644
--- a/tests/DotPulsar.IntegrationTests/ProducerTests.cs
+++ b/tests/DotPulsar.IntegrationTests/ProducerTests.cs
@@ -14,36 +14,33 @@
 
 namespace DotPulsar.IntegrationTests;
 
-using Abstraction;
 using Abstractions;
 using Extensions;
-using Fixtures;
 using FluentAssertions;
 using System;
 using System.Collections.Generic;
-using System.Diagnostics;
+using System.Threading;
 using System.Threading.Tasks;
 using Xunit;
 using Xunit.Abstractions;
 
-[Collection(nameof(StandaloneClusterTest))]
+[Collection(nameof(StandaloneCollection))]
 public class ProducerTests
 {
     private readonly ITestOutputHelper _testOutputHelper;
-    private readonly IPulsarService _pulsarService;
+    private readonly StandaloneFixture _fixture;
 
-    public ProducerTests(ITestOutputHelper outputHelper, StandaloneClusterFixture fixture)
+    public ProducerTests(ITestOutputHelper outputHelper, StandaloneFixture fixture)
     {
         _testOutputHelper = outputHelper;
-        Debug.Assert(fixture.PulsarService != null, "fixture.PulsarService != null");
-        _pulsarService = fixture.PulsarService;
+        _fixture = fixture;
     }
 
     [Fact]
     public async Task SimpleProduceConsume_WhenSendingMessagesToProducer_ThenReceiveMessagesFromConsumer()
     {
         //Arrange
-        await using var client = PulsarClient.Builder().ServiceUrl(_pulsarService.GetBrokerUri()).Build();
+        await using var client = CreateClient();
         string topicName = $"simple-produce-consume{Guid.NewGuid():N}";
         const string content = "test-message";
 
@@ -69,13 +66,12 @@ public class ProducerTests
     public async Task SinglePartition_WhenSendMessages_ThenGetMessagesFromSinglePartition()
     {
         //Arrange
-        var serviceUrl = _pulsarService.GetBrokerUri();
         const string content = "test-message";
         const int partitions = 3;
         const int msgCount = 3;
         var topicName = $"single-partitioned-{Guid.NewGuid():N}";
-        await _pulsarService.CreatePartitionedTopic($"persistent/public/default/{topicName}", partitions);
-        await using var client = PulsarClient.Builder().ServiceUrl(serviceUrl).Build();
+        await _fixture.CreatePartitionedTopic($"persistent/public/default/{topicName}", partitions);
+        await using var client = CreateClient();
 
         //Act
         var consumers = new List<IConsumer<string>>();
@@ -120,19 +116,19 @@ public class ProducerTests
     public async Task RoundRobinPartition_WhenSendMessages_ThenGetMessagesFromPartitionsInOrder()
     {
         //Arrange
-        await using var client = PulsarClient.Builder().ServiceUrl(_pulsarService.GetBrokerUri()).Build();
+        await using var client = CreateClient();
+
         string topicName = $"round-robin-partitioned-{Guid.NewGuid():N}";
         const string content = "test-message";
         const int partitions = 3;
         var consumers = new List<IConsumer<string>>();
 
-        await _pulsarService.CreatePartitionedTopic($"persistent/public/default/{topicName}", partitions);
+        await _fixture.CreatePartitionedTopic($"persistent/public/default/{topicName}", partitions);
 
         //Act
         await using var producer = client.NewProducer(Schema.String)
             .Topic(topicName)
             .Create();
-        await producer.StateChangedTo(ProducerState.Connected);
 
         for (var i = 0; i < partitions; ++i)
         {
@@ -151,4 +147,12 @@ public class ProducerTests
             (await consumers[i].Receive()).Value().Should().Be($"{content}-{i}");
         }
     }
+
+    private IPulsarClient CreateClient()
+        => PulsarClient
+        .Builder()
+        .Authentication(AuthenticationFactory.Token(async ct => await _fixture.GetToken(Timeout.InfiniteTimeSpan)))
+        .ExceptionHandler(ec => _testOutputHelper.WriteLine($"Exception: {ec.Exception}"))
+        .ServiceUrl(_fixture.ServiceUrl)
+        .Build();
 }
diff --git a/tests/DotPulsar.IntegrationTests/Services/PulsarServiceBase.cs b/tests/DotPulsar.IntegrationTests/Services/PulsarServiceBase.cs
deleted file mode 100644
index c614a84..0000000
--- a/tests/DotPulsar.IntegrationTests/Services/PulsarServiceBase.cs
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace DotPulsar.IntegrationTests.Services;
-
-using Abstraction;
-using System;
-using System.Net.Http;
-using System.Text;
-using System.Threading;
-using System.Threading.Tasks;
-using Xunit.Abstractions;
-using Xunit.Sdk;
-
-public abstract class PulsarServiceBase : IPulsarService
-{
-    protected readonly IMessageSink MessageSink;
-    private readonly CancellationTokenSource _cts;
-    private readonly HttpClient _adminClient;
-
-    protected PulsarServiceBase(IMessageSink messageSink)
-    {
-        MessageSink = messageSink;
-        _cts = new CancellationTokenSource();
-        _adminClient = new HttpClient();
-    }
-
-    public abstract Task InitializeAsync();
-
-    public async Task DisposeAsync()
-    {
-        _adminClient.Dispose();
-        _cts.Dispose();
-
-        try
-        {
-            await OnDispose();
-        }
-        catch (Exception e)
-        {
-            MessageSink.OnMessage(new DiagnosticMessage("Error disposing: {0}", e));
-        }
-    }
-
-    protected virtual Task OnDispose()
-        => Task.CompletedTask;
-
-    public abstract Uri GetBrokerUri();
-
-    public abstract Uri GetWebServiceUri();
-
-    public async Task<HttpResponseMessage?> CreatePartitionedTopic(string restTopic, int numPartitions)
-    {
-        var content = new StringContent(numPartitions.ToString(), Encoding.UTF8, "application/json");
-        return await _adminClient.PutAsync($"{GetWebServiceUri()}admin/v2/{restTopic}/partitions", content, _cts.Token).ConfigureAwait(false);
-    }
-}
diff --git a/tests/DotPulsar.IntegrationTests/Services/ServiceFactory.cs b/tests/DotPulsar.IntegrationTests/Services/ServiceFactory.cs
deleted file mode 100644
index 90e10ec..0000000
--- a/tests/DotPulsar.IntegrationTests/Services/ServiceFactory.cs
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace DotPulsar.IntegrationTests.Services;
-
-using Abstraction;
-using Xunit.Abstractions;
-
-public static class ServiceFactory
-{
-    private const string _pulsarDeploymentType = "PULSAR_DEPLOYMENT_TYPE";
-    private const string _containerDeployment = "container";
-
-    public static IPulsarService CreatePulsarService(IMessageSink messageSink)
-    {
-        var deploymentType = System.Environment.GetEnvironmentVariable(_pulsarDeploymentType);
-
-        if (deploymentType == _containerDeployment)
-        {
-            return new StandaloneContainerService(messageSink);
-        }
-
-        return new StandaloneExternalService(messageSink);
-    }
-}
diff --git a/tests/DotPulsar.IntegrationTests/Services/StandaloneContainerService.cs b/tests/DotPulsar.IntegrationTests/Services/StandaloneContainerService.cs
deleted file mode 100644
index 4d7d42e..0000000
--- a/tests/DotPulsar.IntegrationTests/Services/StandaloneContainerService.cs
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace DotPulsar.IntegrationTests.Services;
-
-using System;
-using System.Net.Http;
-using System.Threading.Tasks;
-using Xunit.Abstractions;
-using Xunit.Sdk;
-
-public sealed class StandaloneContainerService : PulsarServiceBase
-{
-    public StandaloneContainerService(IMessageSink messageSink) : base(messageSink) { }
-
-    public override async Task InitializeAsync()
-    {
-        await TakeDownPulsar(); // clean-up if anything was left running from previous run
-
-        await ProcessAsyncHelper.ExecuteShellCommand("docker-compose", "-f docker-compose-standalone-tests.yml up -d")
-            .ThrowOnFailure();
-
-        var waitTries = 10;
-
-        using var handler = new HttpClientHandler { AllowAutoRedirect = true };
-
-        using var client = new HttpClient(handler);
-
-        while (waitTries > 0)
-        {
-            try
-            {
-                await client.GetAsync("http://localhost:54546/metrics/").ConfigureAwait(false);
-                return;
-            }
-            catch
-            {
-                waitTries--;
-                await Task.Delay(5000).ConfigureAwait(false);
-            }
-        }
-
-        throw new Exception("Unable to confirm Pulsar has initialized");
-    }
-
-    protected override Task OnDispose() => TakeDownPulsar();
-
-    private Task TakeDownPulsar()
-        => ProcessAsyncHelper.ExecuteShellCommand("docker-compose", "-f docker-compose-standalone-tests.yml down")
-            .LogFailure(s => MessageSink.OnMessage(new DiagnosticMessage("Error bringing down container: {0}", s)));
-
-    public override Uri GetBrokerUri()
-        => new("pulsar://localhost:54545");
-
-    public override Uri GetWebServiceUri()
-        => new("http://localhost:54546");
-}
diff --git a/tests/DotPulsar.IntegrationTests/Services/StandaloneExternalService.cs b/tests/DotPulsar.IntegrationTests/Services/StandaloneExternalService.cs
deleted file mode 100644
index c1ce2f0..0000000
--- a/tests/DotPulsar.IntegrationTests/Services/StandaloneExternalService.cs
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace DotPulsar.IntegrationTests.Services;
-
-using System;
-using System.Threading.Tasks;
-using Xunit.Abstractions;
-
-public sealed class StandaloneExternalService : PulsarServiceBase
-{
-    public StandaloneExternalService(IMessageSink messageSink) : base(messageSink) { }
-    public override Task InitializeAsync() => Task.CompletedTask;
-
-    public override Uri GetBrokerUri()
-        => new("pulsar://localhost:6650");
-
-    public override Uri GetWebServiceUri()
-        => new("http://localhost:8080");
-}
diff --git a/tests/DotPulsar.IntegrationTests/Fixtures/StandaloneClusterTests.cs b/tests/DotPulsar.IntegrationTests/StandaloneCollection.cs
similarity index 75%
rename from tests/DotPulsar.IntegrationTests/Fixtures/StandaloneClusterTests.cs
rename to tests/DotPulsar.IntegrationTests/StandaloneCollection.cs
index 1a3504b..04b2e07 100644
--- a/tests/DotPulsar.IntegrationTests/Fixtures/StandaloneClusterTests.cs
+++ b/tests/DotPulsar.IntegrationTests/StandaloneCollection.cs
@@ -12,9 +12,9 @@
  * limitations under the License.
  */
 
-namespace DotPulsar.IntegrationTests.Fixtures;
+namespace DotPulsar.IntegrationTests;
 
 using Xunit;
 
-[CollectionDefinition(nameof(StandaloneClusterTest))]
-public class StandaloneClusterTest : ICollectionFixture<StandaloneClusterFixture> { }
+[CollectionDefinition(nameof(StandaloneCollection))]
+public sealed class StandaloneCollection : ICollectionFixture<StandaloneFixture> { }
diff --git a/tests/DotPulsar.IntegrationTests/StandaloneFixture.cs b/tests/DotPulsar.IntegrationTests/StandaloneFixture.cs
new file mode 100644
index 0000000..b5d898b
--- /dev/null
+++ b/tests/DotPulsar.IntegrationTests/StandaloneFixture.cs
@@ -0,0 +1,121 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.IntegrationTests;
+
+using DotPulsar.TestHelpers;
+using System;
+using System.Net.Http;
+using System.Net.Http.Headers;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using Xunit;
+using Xunit.Abstractions;
+using Xunit.Sdk;
+
+public sealed class StandaloneFixture : IAsyncLifetime
+{
+    private readonly string _containerName;
+    private readonly Uri _webServiceUri;
+    private readonly CancellationTokenSource _cts;
+    private readonly IMessageSink _messageSink;
+
+    public StandaloneFixture(IMessageSink messageSink)
+    {
+        _containerName = "standalone";
+        _webServiceUri = new("http://localhost:54548");
+        ServiceUrl = new("pulsar://localhost:54547");
+        _cts = new CancellationTokenSource();
+        _messageSink = messageSink;
+    }
+
+    public Uri ServiceUrl { get; }
+
+    public async Task DisposeAsync()
+    {
+        _cts.Cancel();
+        await TakeDownPulsar();
+    }
+
+    public async Task InitializeAsync()
+    {
+        await TakeDownPulsar(); // clean-up if anything was left running from previous run
+
+        await ProcessAsyncHelper
+            .ExecuteShellCommand("docker-compose", $"-f {_containerName}.yml up -d")
+            .ThrowOnFailure();
+
+        using var handler = new HttpClientHandler { AllowAutoRedirect = true };
+        using var client = new HttpClient(handler);
+
+        var token = await GetToken(Timeout.InfiniteTimeSpan);
+        client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", token);
+
+        var waitTries = 10;
+        while (waitTries > 0)
+        {
+            try
+            {
+                var requestUri = new Uri(_webServiceUri, "metrics/");
+                var response = await client.GetAsync(requestUri);
+                if (response.IsSuccessStatusCode)
+                    return;
+            }
+            catch (Exception e)
+            {
+                _messageSink.OnMessage(new DiagnosticMessage("Error trying to fetch metrics: {0}", e));
+            }
+
+            waitTries--;
+            await Task.Delay(TimeSpan.FromSeconds(10));
+        }
+
+        throw new Exception("Unable to confirm Pulsar has initialized");
+    }
+
+    public async Task<string> GetToken(TimeSpan expiryTime)
+    {
+        var arguments = $"exec {_containerName} bin/pulsar tokens create --secret-key file:///appdata/my-secret.key --subject test-user";
+
+        if (expiryTime != Timeout.InfiniteTimeSpan)
+            arguments += $" --expiry-time {expiryTime.TotalSeconds}s";
+
+        var tokenCreateRequest = await ProcessAsyncHelper.ExecuteShellCommand("docker", arguments);
+
+        if (tokenCreateRequest.Completed)
+            return tokenCreateRequest.Output;
+
+        throw new InvalidOperationException($"Getting token from container failed: {tokenCreateRequest.Output}");
+    }
+
+    public async Task CreatePartitionedTopic(string restTopic, int numPartitions)
+    {
+        using var client = new HttpClient();
+        var token = await GetToken(Timeout.InfiniteTimeSpan);
+        client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", token);
+        var requestUri = new Uri(_webServiceUri, $"admin/v2/{restTopic}/partitions");
+        var content = new StringContent(numPartitions.ToString(), Encoding.UTF8, "application/json");
+        var response = await client.PutAsync(requestUri, content, _cts.Token);
+        if (response.IsSuccessStatusCode)
+            return;
+
+        throw new Exception($"Could not create the partition topic. Got status code {response.StatusCode}");
+    }
+
+    private async Task TakeDownPulsar()
+        => await ProcessAsyncHelper
+        .ExecuteShellCommand("docker-compose", $"-f {_containerName}.yml down")
+        .LogFailure(s => _messageSink.OnMessage(new DiagnosticMessage($"Error bringing down container: {s}")));
+}
diff --git a/tests/DotPulsar.IntegrationTests/TokenRefreshTests.cs b/tests/DotPulsar.IntegrationTests/TokenRefreshTests.cs
deleted file mode 100644
index e244cab..0000000
--- a/tests/DotPulsar.IntegrationTests/TokenRefreshTests.cs
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace DotPulsar.IntegrationTests;
-
-using Abstraction;
-using Abstractions;
-using Extensions;
-using Fixtures;
-using System;
-using System.Collections.Generic;
-using System.Diagnostics;
-using System.Linq;
-using System.Threading;
-using System.Threading.Tasks;
-using Xunit;
-using Xunit.Abstractions;
-
-[Collection(nameof(StandaloneTokenClusterTests))]
-public class TokenRefreshTests
-{
-    public enum TokenTestRefreshType
-    {
-        Standard,
-        FailAtStartup,
-        FailOnRefresh,
-        TimeoutOnRefresh
-    }
-
-    private const string MyTopic = "persistent://public/default/mytopic";
-    private readonly ITestOutputHelper _testOutputHelper;
-    private readonly IPulsarService _pulsarService;
-
-    public TokenRefreshTests(ITestOutputHelper outputHelper, StandaloneTokenClusterFixture fixture)
-    {
-        _testOutputHelper = outputHelper;
-        Debug.Assert(fixture.PulsarService != null, "fixture.PulsarService != null");
-        _pulsarService = fixture.PulsarService;
-    }
-
-    [InlineData(TokenTestRefreshType.Standard, 0)] // Standard happy path with no token refresh failures
-    [InlineData(TokenTestRefreshType.FailAtStartup, 1)] // 1 Failure at startup, not on refresh
-    [InlineData(TokenTestRefreshType.FailOnRefresh, 2)] // Fails on refresh which will force a reconnection and fail once more on new connection
-    [InlineData(TokenTestRefreshType.TimeoutOnRefresh, 0)] // Connection will be disconnected by server due to slow response to auth challenge
-    [Theory]
-    public async Task TestExpiryRefresh(TokenTestRefreshType refreshType, int timesToFail)
-    {
-        var publishingStarted = false;
-        var delayedNames = new HashSet<string>();
-        ValueTask<string> GetToken(string name, ref int count)
-        {
-            if (refreshType is TokenTestRefreshType.Standard)
-            {
-                return GetAuthToken(name);
-            }
-
-            if (refreshType is TokenTestRefreshType.FailAtStartup && !publishingStarted && ++count <= timesToFail)
-            {
-                return ValueTask.FromException<string>(new Exception("Initial Token Failed"));
-            }
-
-            if (refreshType is TokenTestRefreshType.FailOnRefresh && publishingStarted && ++count <= timesToFail)
-            {
-                return ValueTask.FromException<string>(count == 1 ? new Exception("Refresh Failed") : new Exception("Initial Token Failed"));
-            }
-
-            if (refreshType is TokenTestRefreshType.TimeoutOnRefresh && publishingStarted && !delayedNames.Contains(name))
-            {
-                delayedNames.Add(name);
-                Task.Delay(6000);
-            }
-
-            return GetAuthToken(name);
-        }
-
-        var producerTokenCount = 0;
-        await using var producerClient = GetPulsarClient("Producer", (ct) => GetToken("Producer", ref producerTokenCount));
-
-        var consumerTokenCount = 0;
-        await using var consumerClient = GetPulsarClient("Consumer", (ct) => GetToken("Consumer", ref consumerTokenCount));
-
-        await using var producer = producerClient.NewProducer(Schema.String)
-            .Topic(MyTopic)
-            .StateChangedHandler(Monitor)
-            .Create();
-
-        await using var consumer = consumerClient.NewConsumer(Schema.String)
-            .Topic(MyTopic)
-            .StateChangedHandler(Monitor)
-            .SubscriptionName("test-sub")
-            .InitialPosition(SubscriptionInitialPosition.Earliest)
-            .Create();
-
-        const int messageCount = 20;
-        var received = new List<string>(messageCount);
-
-        var publisherTask = Task.Run(async () =>
-        {
-            for (var i = 0; i < messageCount; i++)
-            {
-                _testOutputHelper.WriteLine("Trying to publish message for index {0}", i);
-                var messageId = await producer.Send(i.ToString());
-                publishingStarted = true;
-                _testOutputHelper.WriteLine("Published message {0} for index {1}", messageId, i);
-                await Task.Delay(1000);
-            }
-        });
-
-        var consumerTask = Task.Run(async () =>
-        {
-            for (var i = 0; i < messageCount; i++)
-            {
-                var message = await consumer.Receive();
-                received.Add(message.Value());
-            }
-        });
-
-        var timeoutTask = Task.Delay(60_000);
-        await Task.WhenAny(Task.WhenAll(consumerTask, publisherTask), timeoutTask);
-        Assert.False(timeoutTask.IsCompleted);
-
-        var expected = Enumerable.Range(0, messageCount).Select(i => i.ToString()).ToList();
-        var missing = expected.Except(received).ToList();
-
-        if (missing.Count > 0)
-        {
-            Assert.True(false, $"Missing values: {string.Join(",", missing)}");
-        }
-    }
-
-    private IPulsarClient GetPulsarClient(string name, Func<CancellationToken, ValueTask<string>> tokenFactory)
-        => PulsarClient.Builder()
-            .Authentication(AuthenticationFactory.Token(tokenFactory))
-            .RetryInterval(TimeSpan.FromSeconds(1))
-            .ExceptionHandler(ec =>
-            {
-                _testOutputHelper.WriteLine("Error (handled={0}) occurred in {1} client: {2}", ec.ExceptionHandled, name, ec.Exception);
-            })
-            .ServiceUrl(_pulsarService.GetBrokerUri()).Build();
-
-    private async ValueTask<string> GetAuthToken(string name)
-    {
-        var result = await StandaloneTokenClusterFixture.GetAuthToken(true);
-        _testOutputHelper.WriteLine("{0} received token {1}", name, result);
-        return result;
-    }
-
-    private void Monitor(ProducerStateChanged stateChanged, CancellationToken cancellationToken)
-    {
-        var stateMessage = stateChanged.ProducerState switch
-        {
-            ProducerState.Connected => "is connected",
-            ProducerState.Disconnected => "is disconnected",
-            ProducerState.PartiallyConnected => "is partially connected",
-            ProducerState.Closed => "has closed",
-            ProducerState.Faulted => "has faulted",
-            _ => $"has an unknown state '{stateChanged.ProducerState}'"
-        };
-
-        var topic = stateChanged.Producer.Topic;
-        _testOutputHelper.WriteLine($"The producer for topic '{topic}' " + stateMessage);
-    }
-
-    private void Monitor(ConsumerStateChanged stateChanged, CancellationToken cancellationToken)
-    {
-        var stateMessage = stateChanged.ConsumerState switch
-        {
-            ConsumerState.Active => "is active",
-            ConsumerState.Inactive => "is inactive",
-            ConsumerState.Disconnected => "is disconnected",
-            ConsumerState.Closed => "has closed",
-            ConsumerState.ReachedEndOfTopic => "has reached end of topic",
-            ConsumerState.Faulted => "has faulted",
-            _ => $"has an unknown state '{stateChanged.ConsumerState}'"
-        };
-
-        var topic = stateChanged.Consumer.Topic;
-        _testOutputHelper.WriteLine($"The consumer for topic '{topic}' " + stateMessage);
-    }
-}
diff --git a/tests/DotPulsar.IntegrationTests/TokenTests.cs b/tests/DotPulsar.IntegrationTests/TokenTests.cs
new file mode 100644
index 0000000..f6306e5
--- /dev/null
+++ b/tests/DotPulsar.IntegrationTests/TokenTests.cs
@@ -0,0 +1,162 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.IntegrationTests;
+
+using DotPulsar.Abstractions;
+using DotPulsar.Exceptions;
+using Extensions;
+using FluentAssertions;
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using Xunit;
+using Xunit.Abstractions;
+
+[Collection(nameof(StandaloneCollection))]
+public class TokenTests
+{
+    private const string MyTopic = "persistent://public/default/mytopic";
+
+    private readonly ITestOutputHelper _testOutputHelper;
+    private readonly StandaloneFixture _fixture;
+
+    public TokenTests(ITestOutputHelper outputHelper, StandaloneFixture fixture)
+    {
+        _testOutputHelper = outputHelper;
+        _fixture = fixture;
+    }
+
+    [Fact]
+    public async Task TokenSupplier_WhenTokenSupplierInitiallyThrowsAnException_ShouldFaultProducer()
+    {
+        // Arrange
+        await using var client = CreateClient(ct => throw new Exception());
+        await using var producer = CreateProducer(client);
+
+        // Act
+        var exception = await Record.ExceptionAsync(() => producer.Send("Test").AsTask());
+        var state = await producer.OnStateChangeTo(ProducerState.Faulted);
+
+        // Assert
+        exception.Should().BeOfType<Exception>();
+        state.Should().Be(ProducerState.Faulted);
+    }
+
+    [Fact]
+    public async Task TokenSupplier_WhenTokenSupplierThrowsAnExceptionOnAuthChallenge_ShouldFaultProducer()
+    {
+        // Arrange
+        var throwException = false;
+        await using var client = CreateClient(async ct =>
+        {
+            if (throwException)
+                throw new Exception();
+            var token = await _fixture.GetToken(TimeSpan.FromSeconds(10));
+            _testOutputHelper.WriteLine($"Received token: {token}");
+            return token;
+        });
+
+        await using var producer = CreateProducer(client);
+
+        // Act
+        _ = await producer.Send("Test");  // Make sure we have a working connection
+        throwException = true;
+        var state = await producer.OnStateChangeTo(ProducerState.Faulted);
+
+        // Assert
+        state.Should().Be(ProducerState.Faulted);
+    }
+
+    [Fact]
+    public async Task TokenSupplier_WhenTokenSupplierReturnsToLate_ShouldFaultProducer()
+    {
+        // Arrange
+        await using var client = CreateClient(async ct =>
+        {
+            await Task.Delay(TimeSpan.FromSeconds(10), ct);
+            return string.Empty;
+        });
+
+        await using var producer = CreateProducer(client);
+
+        // Act
+        var exception = await Record.ExceptionAsync(() => producer.Send("Test").AsTask());
+        var state = await producer.OnStateChangeTo(ProducerState.Faulted);
+
+        // Assert
+        exception.Should().BeOfType<AuthenticationException>();
+        state.Should().Be(ProducerState.Faulted);
+    }
+
+    [Fact]
+    public async Task TokenSupplier_WhenTokenSupplierReturnValidToken_ShouldStayConnected()
+    {
+        // Arrange
+        var refreshCount = 0;
+        var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
+
+        await using var client = CreateClient(async ct =>
+        {
+            ++refreshCount;
+            if (refreshCount == 3)
+                tcs.SetResult();
+
+            var token = await _fixture.GetToken(TimeSpan.FromSeconds(10));
+            _testOutputHelper.WriteLine($"Received token: {token}");
+            return token;
+        });
+
+        await using var producer = CreateProducer(client);
+
+        // Act
+        _ = await producer.Send("Test");  // Make sure we have a working connection
+        await tcs.Task;
+        var state = await producer.OnStateChangeTo(ProducerState.Connected);
+
+        // Assert
+        state.Should().Be(ProducerState.Connected);
+    }
+
+    private IPulsarClient CreateClient(Func<CancellationToken, ValueTask<string>> tokenSupplier)
+        => PulsarClient
+        .Builder()
+        .Authentication(AuthenticationFactory.Token(tokenSupplier))
+        .ExceptionHandler(ec => _testOutputHelper.WriteLine($"Exception: {ec.Exception}"))
+        .ServiceUrl(_fixture.ServiceUrl)
+        .Build();
+
+    private IProducer<string> CreateProducer(IPulsarClient client)
+        => client
+        .NewProducer(Schema.String)
+        .Topic(MyTopic)
+        .StateChangedHandler(Monitor)
+        .Create();
+
+    private void Monitor(ProducerStateChanged stateChanged, CancellationToken _)
+    {
+        var stateMessage = stateChanged.ProducerState switch
+        {
+            ProducerState.Connected => "is connected",
+            ProducerState.Disconnected => "is disconnected",
+            ProducerState.PartiallyConnected => "is partially connected",
+            ProducerState.Closed => "has closed",
+            ProducerState.Faulted => "has faulted",
+            _ => $"has an unknown state '{stateChanged.ProducerState}'"
+        };
+
+        var topic = stateChanged.Producer.Topic;
+        _testOutputHelper.WriteLine($"The producer for topic '{topic}' {stateMessage}");
+    }
+}
diff --git a/tests/DotPulsar.IntegrationTests/docker-compose-standalone-tests.yml b/tests/DotPulsar.IntegrationTests/docker-compose-standalone-tests.yml
deleted file mode 100644
index 0e517e5..0000000
--- a/tests/DotPulsar.IntegrationTests/docker-compose-standalone-tests.yml
+++ /dev/null
@@ -1,21 +0,0 @@
-version: '3.5'
-
-services:
-
-  pulsar:
-    container_name: pulsar-standalone
-    image: 'apachepulsar/pulsar:2.7.0'
-    ports:
-      - '54546:8080'
-      - '54545:6650'
-    environment:
-      PULSAR_MEM: " -Xms1g -Xmx1g -XX:MaxDirectMemorySize=2g"
-    command: |
-      /bin/bash -c "bin/apply-config-from-env.py conf/standalone.conf && bin/pulsar standalone --no-functions-worker"
-    networks:
-      - pulsar-standalone
-        
-networks:
-  pulsar-standalone:
-    name: pulsar-standalone
-    driver: bridge
diff --git a/tests/DotPulsar.IntegrationTests/docker-compose-standalone-token-tests.yml b/tests/DotPulsar.IntegrationTests/standalone.yml
similarity index 63%
rename from tests/DotPulsar.IntegrationTests/docker-compose-standalone-token-tests.yml
rename to tests/DotPulsar.IntegrationTests/standalone.yml
index af76c5b..035c1d2 100644
--- a/tests/DotPulsar.IntegrationTests/docker-compose-standalone-token-tests.yml
+++ b/tests/DotPulsar.IntegrationTests/standalone.yml
@@ -2,12 +2,12 @@
   
 services:
         
-  pulsar-tokens:
-    container_name: pulsar-tokens
+  standalone:
+    container_name: standalone
     image: 'apachepulsar/pulsar:2.7.0'
     ports:
-      - '54548:8081'
-      - '54547:6651'
+      - '54548:8080'
+      - '54547:6650'
     volumes:
       - ./appdata/:/appdata
     environment:
@@ -17,18 +17,10 @@ services:
       - authenticationProviders=org.apache.pulsar.broker.authentication.AuthenticationProviderToken
       - authenticateOriginalAuthData=false
       - brokerClientAuthenticationPlugin=org.apache.pulsar.client.impl.auth.AuthenticationToken
-      - brokerClientAuthenticationParameters=token:eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0LXVzZXIifQ.CoBrja1EHr0e2kZKGFS8M-xS2SOC2E08yZmjktvcYOs
+      - brokerClientAuthenticationParameters=token:eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0LXVzZXIifQ.KGnuZLJys6MUSth__ZRYdb4sPIKe9_kRrm2wfZ7Dwrc
       - superUserRoles=test-user
       - PULSAR_PREFIX_authenticationRefreshCheckSeconds=5
-      - webServicePort=8081
-      - brokerServicePort=6651
-#      - PULSAR_LOG_LEVEL=debug
+      - webServicePort=8080
+      - brokerServicePort=6650
     command: |
-      /bin/bash -c "bin/apply-config-from-env.py conf/standalone.conf && bin/pulsar standalone --no-functions-worker"
-    networks:
-      - pulsar-tokens
-        
-networks:
-  pulsar-tokens:
-    name: pulsar-tokens
-    driver: bridge
+      /bin/bash -c "bin/apply-config-from-env.py conf/standalone.conf && bin/pulsar standalone --no-functions-worker"
\ No newline at end of file
diff --git a/tests/DotPulsar.StressTests/DotPulsar.StressTests.csproj b/tests/DotPulsar.StressTests/DotPulsar.StressTests.csproj
index 2d5d538..0a7476d 100644
--- a/tests/DotPulsar.StressTests/DotPulsar.StressTests.csproj
+++ b/tests/DotPulsar.StressTests/DotPulsar.StressTests.csproj
@@ -12,7 +12,7 @@
       <PrivateAssets>all</PrivateAssets>
       <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
     </PackageReference>
-    <PackageReference Include="coverlet.collector" Version="3.1.0">
+    <PackageReference Include="coverlet.collector" Version="3.1.1">
       <PrivateAssets>all</PrivateAssets>
       <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
     </PackageReference>
@@ -26,7 +26,7 @@
 
   <ItemGroup>
     <ProjectReference Include="..\..\src\DotPulsar\DotPulsar.csproj" />
-    <ProjectReference Include="..\DotPulsar.IntegrationTests\DotPulsar.IntegrationTests.csproj" />
+    <ProjectReference Include="..\DotPulsar.TestHelpers\DotPulsar.TestHelpers.csproj" />
   </ItemGroup>
 
 </Project>
diff --git a/tests/DotPulsar.StressTests/EnumerableTaskExtensions.cs b/tests/DotPulsar.StressTests/EnumerableTaskExtensions.cs
index 078057d..2437085 100644
--- a/tests/DotPulsar.StressTests/EnumerableTaskExtensions.cs
+++ b/tests/DotPulsar.StressTests/EnumerableTaskExtensions.cs
@@ -44,7 +44,7 @@ public static class EnumerableValueTaskExtensions
 
     [DebuggerStepThrough]
     public static async Task<TResult[]> WhenAllAsTask<TResult>(this IEnumerable<ValueTask<TResult>> source) where TResult : notnull
-        => await source.WhenAll().ConfigureAwait(false);
+        => await source.WhenAll();
 
     [DebuggerStepThrough]
     public static async IAsyncEnumerable<TResult> Enumerate<TResult>(this IEnumerable<ValueTask<TResult>> source) where TResult : notnull
@@ -52,7 +52,7 @@ public static class EnumerableValueTaskExtensions
         foreach (var operation in source.Select(GetInfo))
         {
             yield return operation.Task is not null
-                ? await operation.Task.ConfigureAwait(false)
+                ? await operation.Task
                 : operation.Result;
         }
     }
diff --git a/tests/DotPulsar.StressTests/Fixtures/StandaloneClusterFixture.cs b/tests/DotPulsar.StressTests/Fixtures/StandaloneClusterFixture.cs
index 500cd84..0236efe 100644
--- a/tests/DotPulsar.StressTests/Fixtures/StandaloneClusterFixture.cs
+++ b/tests/DotPulsar.StressTests/Fixtures/StandaloneClusterFixture.cs
@@ -14,7 +14,7 @@
 
 namespace DotPulsar.StressTests.Fixtures;
 
-using IntegrationTests;
+using DotPulsar.TestHelpers;
 using System;
 using System.Net.Http;
 using System.Threading.Tasks;
@@ -35,30 +35,30 @@ public class StandaloneClusterFixture : IAsyncLifetime
     {
         await TakeDownPulsar(); // clean-up if anything was left running from previous run
 
-        await ProcessAsyncHelper.ExecuteShellCommand("docker-compose", "-f docker-compose-standalone-tests.yml up -d")
+        await ProcessAsyncHelper
+            .ExecuteShellCommand("docker-compose", "-f docker-compose-standalone-tests.yml up -d")
             .ThrowOnFailure();
 
-        var waitTries = 10;
-
-        using var handler = new HttpClientHandler
-        {
-            AllowAutoRedirect = true
-        };
-
+        using var handler = new HttpClientHandler { AllowAutoRedirect = true };
         using var client = new HttpClient(handler);
 
+        var waitTries = 10;
         while (waitTries > 0)
         {
             try
             {
-                await client.GetAsync("http://localhost:54546/metrics/").ConfigureAwait(false);
-                return;
+                var requestUri = "http://localhost:54546/metrics/";
+                var response = await client.GetAsync(requestUri);
+                if (response.IsSuccessStatusCode)
+                    return;
             }
-            catch
+            catch (Exception e)
             {
-                waitTries--;
-                await Task.Delay(5000).ConfigureAwait(false);
+                _messageSink.OnMessage(new DiagnosticMessage("Error trying to fetch metrics: {0}", e));
             }
+
+            waitTries--;
+            await Task.Delay(TimeSpan.FromSeconds(10));
         }
 
         throw new Exception("Unable to confirm Pulsar has initialized");
@@ -72,11 +72,12 @@ public class StandaloneClusterFixture : IAsyncLifetime
         }
         catch (Exception e)
         {
-            _messageSink.OnMessage(new DiagnosticMessage("Error taking down pulsar: {0}", e));
+            _messageSink.OnMessage(new DiagnosticMessage($"Error taking down pulsar: {e}"));
         }
     }
 
     private Task TakeDownPulsar()
-        => ProcessAsyncHelper.ExecuteShellCommand("docker-compose", "-f docker-compose-standalone-tests.yml down")
-            .LogFailure(s => _messageSink.OnMessage(new DiagnosticMessage("Error bringing down container: {0}", s)));
+        => ProcessAsyncHelper
+        .ExecuteShellCommand("docker-compose", "-f docker-compose-standalone-tests.yml down")
+        .LogFailure(s => _messageSink.OnMessage(new DiagnosticMessage($"Error bringing down container: {s}")));
 }
diff --git a/tests/DotPulsar.StressTests/XunitExceptionHandler.cs b/tests/DotPulsar.StressTests/XunitExceptionHandler.cs
index 59211f4..79102d1 100644
--- a/tests/DotPulsar.StressTests/XunitExceptionHandler.cs
+++ b/tests/DotPulsar.StressTests/XunitExceptionHandler.cs
@@ -38,9 +38,6 @@ internal class XunitExceptionHandler : IHandleException
         await _exceptionHandler.OnException(exceptionContext).ConfigureAwait(false);
 
         if (!exceptionContext.ExceptionHandled)
-            _output.WriteLine(
-                $"{exceptionContext.Exception.GetType().Name} " +
-                $"{exceptionContext.Exception.Message}{Environment.NewLine}" +
-                $"{exceptionContext.Exception.StackTrace}");
+            _output.WriteLine($"Got exception: {exceptionContext.Exception}");
     }
 }
diff --git a/tests/DotPulsar.TestHelpers/DotPulsar.TestHelpers.csproj b/tests/DotPulsar.TestHelpers/DotPulsar.TestHelpers.csproj
new file mode 100644
index 0000000..132c02c
--- /dev/null
+++ b/tests/DotPulsar.TestHelpers/DotPulsar.TestHelpers.csproj
@@ -0,0 +1,9 @@
+<Project Sdk="Microsoft.NET.Sdk">
+
+  <PropertyGroup>
+    <TargetFramework>net6.0</TargetFramework>
+    <ImplicitUsings>enable</ImplicitUsings>
+    <Nullable>enable</Nullable>
+  </PropertyGroup>
+
+</Project>
diff --git a/tests/DotPulsar.IntegrationTests/ProcessAsyncHelper.cs b/tests/DotPulsar.TestHelpers/ProcessAsyncHelper.cs
similarity index 92%
rename from tests/DotPulsar.IntegrationTests/ProcessAsyncHelper.cs
rename to tests/DotPulsar.TestHelpers/ProcessAsyncHelper.cs
index c967563..73865cd 100644
--- a/tests/DotPulsar.IntegrationTests/ProcessAsyncHelper.cs
+++ b/tests/DotPulsar.TestHelpers/ProcessAsyncHelper.cs
@@ -12,7 +12,7 @@
  * limitations under the License.
  */
 
-namespace DotPulsar.IntegrationTests;
+namespace DotPulsar.TestHelpers;
 
 using System;
 using System.Diagnostics;
@@ -26,9 +26,7 @@ public static class ProcessAsyncHelper
         var result = await resultTask;
 
         if (!result.Completed)
-        {
-            throw new InvalidOperationException($"Process did not complete correctly, {Environment.NewLine}{result.Output}");
-        }
+            throw new InvalidOperationException($"Process did not complete correctly: {result.Output}");
     }
 
     public static async Task LogFailure(this Task<ProcessResult> resultTask, Action<string> logAction)
@@ -36,15 +34,11 @@ public static class ProcessAsyncHelper
         var result = await resultTask;
 
         if (!result.Completed)
-        {
             logAction(result.Output);
-        }
     }
 
     public static async Task<ProcessResult> ExecuteShellCommand(string command, string arguments)
     {
-        var result = new ProcessResult();
-
         using var process = new Process();
 
         process.StartInfo.FileName = command;
@@ -60,14 +54,10 @@ public static class ProcessAsyncHelper
         process.OutputDataReceived += (s, e) =>
         {
             // The output stream has been closed i.e. the process has terminated
-            if (e.Data == null)
-            {
+            if (e.Data is null)
                 outputCloseEvent.SetResult(true);
-            }
             else
-            {
                 outputBuilder.Append(e.Data);
-            }
         };
 
         var errorBuilder = new StringBuilder();
@@ -76,16 +66,13 @@ public static class ProcessAsyncHelper
         process.ErrorDataReceived += (s, e) =>
         {
             // The error stream has been closed i.e. the process has terminated
-            if (e.Data == null)
-            {
+            if (e.Data is null)
                 errorCloseEvent.SetResult(true);
-            }
             else
-            {
                 errorBuilder.Append(e.Data);
-            }
         };
 
+        var result = new ProcessResult();
         bool isStarted;
 
         try
diff --git a/tests/DotPulsar.Tests/DotPulsar.Tests.csproj b/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
index 4d44986..713a379 100644
--- a/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
+++ b/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
@@ -14,7 +14,7 @@
       <PrivateAssets>all</PrivateAssets>
       <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
     </PackageReference>
-    <PackageReference Include="coverlet.collector" Version="3.1.0">
+    <PackageReference Include="coverlet.collector" Version="3.1.1">
       <PrivateAssets>all</PrivateAssets>
       <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
     </PackageReference>