You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bl...@apache.org on 2022/02/02 12:33:14 UTC
[pulsar-dotpulsar] branch master updated: Fixed the tests
This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 6c60892 Fixed the tests
6c60892 is described below
commit 6c6089237bf59a9cbf99a88d0b0e87d0904a72cf
Author: Daniel Blankensteiner <db...@vmail.dk>
AuthorDate: Wed Feb 2 13:33:03 2022 +0100
Fixed the tests
---
DotPulsar.sln | 13 +-
samples/Consuming/Program.cs | 2 +-
samples/Producing/Program.cs | 2 +-
samples/Reading/Program.cs | 2 +-
src/DotPulsar/Internal/Connection.cs | 4 +-
.../Abstraction/IPulsarService.cs | 43 -----
.../DotPulsar.IntegrationTests.csproj | 8 +-
.../Fixtures/StandaloneClusterFixture.cs | 45 -----
.../Fixtures/StandaloneTokenClusterFixture.cs | 99 -----------
.../Fixtures/StandaloneTokenClusterTests.cs | 20 ---
tests/DotPulsar.IntegrationTests/ProducerTests.cs | 34 ++--
.../Services/PulsarServiceBase.cs | 68 --------
.../Services/ServiceFactory.cs | 36 ----
.../Services/StandaloneContainerService.cs | 68 --------
.../Services/StandaloneExternalService.cs | 31 ----
...loneClusterTests.cs => StandaloneCollection.cs} | 6 +-
.../StandaloneFixture.cs | 121 +++++++++++++
.../TokenRefreshTests.cs | 191 ---------------------
tests/DotPulsar.IntegrationTests/TokenTests.cs | 162 +++++++++++++++++
.../docker-compose-standalone-tests.yml | 21 ---
...e-standalone-token-tests.yml => standalone.yml} | 24 +--
.../DotPulsar.StressTests.csproj | 4 +-
.../EnumerableTaskExtensions.cs | 4 +-
.../Fixtures/StandaloneClusterFixture.cs | 35 ++--
.../DotPulsar.StressTests/XunitExceptionHandler.cs | 5 +-
.../DotPulsar.TestHelpers.csproj | 9 +
.../ProcessAsyncHelper.cs | 23 +--
tests/DotPulsar.Tests/DotPulsar.Tests.csproj | 2 +-
28 files changed, 370 insertions(+), 712 deletions(-)
diff --git a/DotPulsar.sln b/DotPulsar.sln
index 9c4a0e3..f273d9e 100644
--- a/DotPulsar.sln
+++ b/DotPulsar.sln
@@ -1,7 +1,7 @@
Microsoft Visual Studio Solution File, Format Version 12.00
-# Visual Studio Version 16
-VisualStudioVersion = 16.0.29209.62
+# Visual Studio Version 17
+VisualStudioVersion = 17.0.32112.339
MinimumVisualStudioVersion = 10.0.40219.1
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotPulsar", "src\DotPulsar\DotPulsar.csproj", "{8D300FC3-2E35-4D23-B0DD-FEE9E153330A}"
EndProject
@@ -26,7 +26,9 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution
README.md = README.md
EndProjectSection
EndProject
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DotPulsar.IntegrationTests", "tests\DotPulsar.IntegrationTests\DotPulsar.IntegrationTests.csproj", "{B44E52DB-DB45-4E31-AA2C-68E5C52AFDEB}"
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotPulsar.IntegrationTests", "tests\DotPulsar.IntegrationTests\DotPulsar.IntegrationTests.csproj", "{B44E52DB-DB45-4E31-AA2C-68E5C52AFDEB}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DotPulsar.TestHelpers", "tests\DotPulsar.TestHelpers\DotPulsar.TestHelpers.csproj", "{F75D1A07-BBED-411C-A900-357790717D8F}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
@@ -62,6 +64,10 @@ Global
{B44E52DB-DB45-4E31-AA2C-68E5C52AFDEB}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B44E52DB-DB45-4E31-AA2C-68E5C52AFDEB}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B44E52DB-DB45-4E31-AA2C-68E5C52AFDEB}.Release|Any CPU.Build.0 = Release|Any CPU
+ {F75D1A07-BBED-411C-A900-357790717D8F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {F75D1A07-BBED-411C-A900-357790717D8F}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {F75D1A07-BBED-411C-A900-357790717D8F}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {F75D1A07-BBED-411C-A900-357790717D8F}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -73,6 +79,7 @@ Global
{14934BED-A222-47B2-A58A-CFC4AAB89B49} = {E7106D0F-B255-4631-9FB8-734FC5748FA9}
{6D44683B-865C-4D15-9F0A-1A8441354589} = {E7106D0F-B255-4631-9FB8-734FC5748FA9}
{B44E52DB-DB45-4E31-AA2C-68E5C52AFDEB} = {E1C932A9-6D4C-4DDF-8922-BE7B71F12F1C}
+ {F75D1A07-BBED-411C-A900-357790717D8F} = {E1C932A9-6D4C-4DDF-8922-BE7B71F12F1C}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {88355922-E70A-4B73-B7F8-ABF8F2B59789}
diff --git a/samples/Consuming/Program.cs b/samples/Consuming/Program.cs
index ed2bb74..c16dda1 100644
--- a/samples/Consuming/Program.cs
+++ b/samples/Consuming/Program.cs
@@ -75,6 +75,6 @@ internal static class Program
};
var topic = stateChanged.Consumer.Topic;
- Console.WriteLine($"The consumer for topic '{topic}' " + stateMessage);
+ Console.WriteLine($"The consumer for topic '{topic}' {stateMessage}");
}
}
diff --git a/samples/Producing/Program.cs b/samples/Producing/Program.cs
index 14749b7..0dce9f5 100644
--- a/samples/Producing/Program.cs
+++ b/samples/Producing/Program.cs
@@ -78,6 +78,6 @@ internal static class Program
};
var topic = stateChanged.Producer.Topic;
- Console.WriteLine($"The producer for topic '{topic}' " + stateMessage);
+ Console.WriteLine($"The producer for topic '{topic}' {stateMessage}");
}
}
diff --git a/samples/Reading/Program.cs b/samples/Reading/Program.cs
index b76b1b8..d00ad5c 100644
--- a/samples/Reading/Program.cs
+++ b/samples/Reading/Program.cs
@@ -73,6 +73,6 @@ internal static class Program
};
var topic = stateChanged.Reader.Topic;
- Console.WriteLine($"The reader for topic '{topic}' " + stateMessage);
+ Console.WriteLine($"The reader for topic '{topic}' {stateMessage}");
}
}
diff --git a/src/DotPulsar/Internal/Connection.cs b/src/DotPulsar/Internal/Connection.cs
index 5b35c7d..adb1d34 100644
--- a/src/DotPulsar/Internal/Connection.cs
+++ b/src/DotPulsar/Internal/Connection.cs
@@ -86,6 +86,8 @@ public sealed class Connection : IConnection
private async Task Send(CommandAuthResponse command, CancellationToken cancellationToken)
{
+ await Task.Yield();
+
if (_authentication is not null)
{
if (command.Response is null)
@@ -308,7 +310,7 @@ public sealed class Connection : IConnection
if (command.CommandType == BaseCommand.Type.Message)
_channelManager.Incoming(command.Message, new ReadOnlySequence<byte>(frame.Slice(commandSize + 4).ToArray()));
else if (command.CommandType == BaseCommand.Type.AuthChallenge)
- await Send(new CommandAuthResponse(), cancellationToken).ConfigureAwait(false);
+ _ = Send(new CommandAuthResponse(), cancellationToken).ConfigureAwait(false);
else
_channelManager.Incoming(command);
}
diff --git a/tests/DotPulsar.IntegrationTests/Abstraction/IPulsarService.cs b/tests/DotPulsar.IntegrationTests/Abstraction/IPulsarService.cs
deleted file mode 100644
index 4c9332f..0000000
--- a/tests/DotPulsar.IntegrationTests/Abstraction/IPulsarService.cs
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace DotPulsar.IntegrationTests.Abstraction;
-
-using System;
-using System.Net.Http;
-using System.Threading.Tasks;
-using Xunit;
-
-/// <summary>
-/// Pulsar Service interface
-/// </summary>
-public interface IPulsarService : IAsyncLifetime
-{
- /// <summary>
- /// Get broker binary protocol uri
- /// </summary>
- Uri GetBrokerUri();
-
- /// <summary>
- /// Get broker rest uri
- /// </summary>
- Uri GetWebServiceUri();
-
- /// <summary>
- /// Create a partitioned topic
- /// The format of the restTopic must be `{schema}/{tenant}/{namespace}/{topicName}`
- /// For example, `persistent/public/default/test-topic`
- /// </summary>
- Task<HttpResponseMessage?> CreatePartitionedTopic(string restTopic, int numPartitions);
-}
diff --git a/tests/DotPulsar.IntegrationTests/DotPulsar.IntegrationTests.csproj b/tests/DotPulsar.IntegrationTests/DotPulsar.IntegrationTests.csproj
index 7ce98c4..e7335e5 100644
--- a/tests/DotPulsar.IntegrationTests/DotPulsar.IntegrationTests.csproj
+++ b/tests/DotPulsar.IntegrationTests/DotPulsar.IntegrationTests.csproj
@@ -14,7 +14,7 @@
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
</PackageReference>
- <PackageReference Include="coverlet.collector" Version="3.1.0">
+ <PackageReference Include="coverlet.collector" Version="3.1.1">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
</PackageReference>
@@ -22,13 +22,11 @@
<ItemGroup>
<ProjectReference Include="..\..\src\DotPulsar\DotPulsar.csproj" />
+ <ProjectReference Include="..\DotPulsar.TestHelpers\DotPulsar.TestHelpers.csproj" />
</ItemGroup>
<ItemGroup>
- <None Update="docker-compose-standalone-tests.yml">
- <CopyToOutputDirectory>Always</CopyToOutputDirectory>
- </None>
- <None Update="docker-compose-standalone-token-tests.yml">
+ <None Update="standalone.yml">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
<None Update="appdata\my-secret.key">
diff --git a/tests/DotPulsar.IntegrationTests/Fixtures/StandaloneClusterFixture.cs b/tests/DotPulsar.IntegrationTests/Fixtures/StandaloneClusterFixture.cs
deleted file mode 100644
index eeb3090..0000000
--- a/tests/DotPulsar.IntegrationTests/Fixtures/StandaloneClusterFixture.cs
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace DotPulsar.IntegrationTests.Fixtures;
-
-using Abstraction;
-using Services;
-using System.Threading.Tasks;
-using Xunit;
-using Xunit.Abstractions;
-
-public class StandaloneClusterFixture : IAsyncLifetime
-{
- private readonly IMessageSink _messageSink;
-
- public StandaloneClusterFixture(IMessageSink messageSink)
- {
- _messageSink = messageSink;
- }
-
- public IPulsarService? PulsarService { private set; get; }
-
- public async Task InitializeAsync()
- {
- PulsarService = ServiceFactory.CreatePulsarService(_messageSink);
- await PulsarService.InitializeAsync();
- }
-
- public async Task DisposeAsync()
- {
- if (PulsarService != null)
- await PulsarService.DisposeAsync();
- }
-}
diff --git a/tests/DotPulsar.IntegrationTests/Fixtures/StandaloneTokenClusterFixture.cs b/tests/DotPulsar.IntegrationTests/Fixtures/StandaloneTokenClusterFixture.cs
deleted file mode 100644
index f2c7884..0000000
--- a/tests/DotPulsar.IntegrationTests/Fixtures/StandaloneTokenClusterFixture.cs
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace DotPulsar.IntegrationTests.Fixtures;
-
-using Abstraction;
-using Services;
-using System;
-using System.Net.Http;
-using System.Net.Http.Headers;
-using System.Threading.Tasks;
-using Xunit.Abstractions;
-using Xunit.Sdk;
-
-public class StandaloneTokenClusterFixture : PulsarServiceBase
-{
- private readonly IMessageSink _messageSink;
-
- public StandaloneTokenClusterFixture(IMessageSink messageSink) : base(messageSink)
- {
- _messageSink = messageSink;
- }
-
- public IPulsarService PulsarService => this;
-
- public override async Task InitializeAsync()
- {
- await TakeDownPulsar(); // clean-up if anything was left running from previous run
-
- await ProcessAsyncHelper
- .ExecuteShellCommand("docker-compose", "-f docker-compose-standalone-token-tests.yml up -d")
- .ThrowOnFailure();
-
- var waitTries = 10;
-
- using var handler = new HttpClientHandler { AllowAutoRedirect = true };
-
- using var client = new HttpClient(handler);
-
- var token = await GetAuthToken(false);
-
- client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", token);
-
- while (waitTries > 0)
- {
- try
- {
- await client.GetAsync($"{PulsarService.GetWebServiceUri()}/metrics/").ConfigureAwait(false);
- return;
- }
- catch (Exception e)
- {
- _messageSink.OnMessage(new DiagnosticMessage("Error trying to fetch metrics: {0}", e));
- waitTries--;
- await Task.Delay(5000).ConfigureAwait(false);
- }
- }
-
- throw new Exception("Unable to confirm Pulsar has initialized");
- }
-
- protected override async Task OnDispose()
- => await TakeDownPulsar();
-
- public override Uri GetBrokerUri() => new("pulsar://localhost:54547");
-
- public override Uri GetWebServiceUri() => new("http://localhost:54548");
-
- private Task TakeDownPulsar()
- => ProcessAsyncHelper
- .ExecuteShellCommand("docker-compose", "-f docker-compose-standalone-token-tests.yml down")
- .LogFailure(s => MessageSink.OnMessage(new DiagnosticMessage("Error bringing down container: {0}", s)));
-
- public static async Task<string> GetAuthToken(bool includeExpiry)
- {
- var arguments = "exec pulsar-tokens bin/pulsar tokens create --secret-key file:///appdata/my-secret.key --subject test-user";
-
- if (includeExpiry)
- arguments += " --expiry-time 10s";
-
- var tokenCreateRequest = await ProcessAsyncHelper.ExecuteShellCommand("docker", arguments);
-
- if (!tokenCreateRequest.Completed)
- throw new InvalidOperationException($"Getting token from container failed: {tokenCreateRequest.Output}");
-
- return tokenCreateRequest.Output;
- }
-}
diff --git a/tests/DotPulsar.IntegrationTests/Fixtures/StandaloneTokenClusterTests.cs b/tests/DotPulsar.IntegrationTests/Fixtures/StandaloneTokenClusterTests.cs
deleted file mode 100644
index 612e79c..0000000
--- a/tests/DotPulsar.IntegrationTests/Fixtures/StandaloneTokenClusterTests.cs
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace DotPulsar.IntegrationTests.Fixtures;
-
-using Xunit;
-
-[CollectionDefinition(nameof(StandaloneTokenClusterTests))]
-public class StandaloneTokenClusterTests : ICollectionFixture<StandaloneTokenClusterFixture> { }
diff --git a/tests/DotPulsar.IntegrationTests/ProducerTests.cs b/tests/DotPulsar.IntegrationTests/ProducerTests.cs
index 6c1ac25..10b6a02 100644
--- a/tests/DotPulsar.IntegrationTests/ProducerTests.cs
+++ b/tests/DotPulsar.IntegrationTests/ProducerTests.cs
@@ -14,36 +14,33 @@
namespace DotPulsar.IntegrationTests;
-using Abstraction;
using Abstractions;
using Extensions;
-using Fixtures;
using FluentAssertions;
using System;
using System.Collections.Generic;
-using System.Diagnostics;
+using System.Threading;
using System.Threading.Tasks;
using Xunit;
using Xunit.Abstractions;
-[Collection(nameof(StandaloneClusterTest))]
+[Collection(nameof(StandaloneCollection))]
public class ProducerTests
{
private readonly ITestOutputHelper _testOutputHelper;
- private readonly IPulsarService _pulsarService;
+ private readonly StandaloneFixture _fixture;
- public ProducerTests(ITestOutputHelper outputHelper, StandaloneClusterFixture fixture)
+ public ProducerTests(ITestOutputHelper outputHelper, StandaloneFixture fixture)
{
_testOutputHelper = outputHelper;
- Debug.Assert(fixture.PulsarService != null, "fixture.PulsarService != null");
- _pulsarService = fixture.PulsarService;
+ _fixture = fixture;
}
[Fact]
public async Task SimpleProduceConsume_WhenSendingMessagesToProducer_ThenReceiveMessagesFromConsumer()
{
//Arrange
- await using var client = PulsarClient.Builder().ServiceUrl(_pulsarService.GetBrokerUri()).Build();
+ await using var client = CreateClient();
string topicName = $"simple-produce-consume{Guid.NewGuid():N}";
const string content = "test-message";
@@ -69,13 +66,12 @@ public class ProducerTests
public async Task SinglePartition_WhenSendMessages_ThenGetMessagesFromSinglePartition()
{
//Arrange
- var serviceUrl = _pulsarService.GetBrokerUri();
const string content = "test-message";
const int partitions = 3;
const int msgCount = 3;
var topicName = $"single-partitioned-{Guid.NewGuid():N}";
- await _pulsarService.CreatePartitionedTopic($"persistent/public/default/{topicName}", partitions);
- await using var client = PulsarClient.Builder().ServiceUrl(serviceUrl).Build();
+ await _fixture.CreatePartitionedTopic($"persistent/public/default/{topicName}", partitions);
+ await using var client = CreateClient();
//Act
var consumers = new List<IConsumer<string>>();
@@ -120,19 +116,19 @@ public class ProducerTests
public async Task RoundRobinPartition_WhenSendMessages_ThenGetMessagesFromPartitionsInOrder()
{
//Arrange
- await using var client = PulsarClient.Builder().ServiceUrl(_pulsarService.GetBrokerUri()).Build();
+ await using var client = CreateClient();
+
string topicName = $"round-robin-partitioned-{Guid.NewGuid():N}";
const string content = "test-message";
const int partitions = 3;
var consumers = new List<IConsumer<string>>();
- await _pulsarService.CreatePartitionedTopic($"persistent/public/default/{topicName}", partitions);
+ await _fixture.CreatePartitionedTopic($"persistent/public/default/{topicName}", partitions);
//Act
await using var producer = client.NewProducer(Schema.String)
.Topic(topicName)
.Create();
- await producer.StateChangedTo(ProducerState.Connected);
for (var i = 0; i < partitions; ++i)
{
@@ -151,4 +147,12 @@ public class ProducerTests
(await consumers[i].Receive()).Value().Should().Be($"{content}-{i}");
}
}
+
+ private IPulsarClient CreateClient()
+ => PulsarClient
+ .Builder()
+ .Authentication(AuthenticationFactory.Token(async ct => await _fixture.GetToken(Timeout.InfiniteTimeSpan)))
+ .ExceptionHandler(ec => _testOutputHelper.WriteLine($"Exception: {ec.Exception}"))
+ .ServiceUrl(_fixture.ServiceUrl)
+ .Build();
}
diff --git a/tests/DotPulsar.IntegrationTests/Services/PulsarServiceBase.cs b/tests/DotPulsar.IntegrationTests/Services/PulsarServiceBase.cs
deleted file mode 100644
index c614a84..0000000
--- a/tests/DotPulsar.IntegrationTests/Services/PulsarServiceBase.cs
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace DotPulsar.IntegrationTests.Services;
-
-using Abstraction;
-using System;
-using System.Net.Http;
-using System.Text;
-using System.Threading;
-using System.Threading.Tasks;
-using Xunit.Abstractions;
-using Xunit.Sdk;
-
-public abstract class PulsarServiceBase : IPulsarService
-{
- protected readonly IMessageSink MessageSink;
- private readonly CancellationTokenSource _cts;
- private readonly HttpClient _adminClient;
-
- protected PulsarServiceBase(IMessageSink messageSink)
- {
- MessageSink = messageSink;
- _cts = new CancellationTokenSource();
- _adminClient = new HttpClient();
- }
-
- public abstract Task InitializeAsync();
-
- public async Task DisposeAsync()
- {
- _adminClient.Dispose();
- _cts.Dispose();
-
- try
- {
- await OnDispose();
- }
- catch (Exception e)
- {
- MessageSink.OnMessage(new DiagnosticMessage("Error disposing: {0}", e));
- }
- }
-
- protected virtual Task OnDispose()
- => Task.CompletedTask;
-
- public abstract Uri GetBrokerUri();
-
- public abstract Uri GetWebServiceUri();
-
- public async Task<HttpResponseMessage?> CreatePartitionedTopic(string restTopic, int numPartitions)
- {
- var content = new StringContent(numPartitions.ToString(), Encoding.UTF8, "application/json");
- return await _adminClient.PutAsync($"{GetWebServiceUri()}admin/v2/{restTopic}/partitions", content, _cts.Token).ConfigureAwait(false);
- }
-}
diff --git a/tests/DotPulsar.IntegrationTests/Services/ServiceFactory.cs b/tests/DotPulsar.IntegrationTests/Services/ServiceFactory.cs
deleted file mode 100644
index 90e10ec..0000000
--- a/tests/DotPulsar.IntegrationTests/Services/ServiceFactory.cs
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace DotPulsar.IntegrationTests.Services;
-
-using Abstraction;
-using Xunit.Abstractions;
-
-public static class ServiceFactory
-{
- private const string _pulsarDeploymentType = "PULSAR_DEPLOYMENT_TYPE";
- private const string _containerDeployment = "container";
-
- public static IPulsarService CreatePulsarService(IMessageSink messageSink)
- {
- var deploymentType = System.Environment.GetEnvironmentVariable(_pulsarDeploymentType);
-
- if (deploymentType == _containerDeployment)
- {
- return new StandaloneContainerService(messageSink);
- }
-
- return new StandaloneExternalService(messageSink);
- }
-}
diff --git a/tests/DotPulsar.IntegrationTests/Services/StandaloneContainerService.cs b/tests/DotPulsar.IntegrationTests/Services/StandaloneContainerService.cs
deleted file mode 100644
index 4d7d42e..0000000
--- a/tests/DotPulsar.IntegrationTests/Services/StandaloneContainerService.cs
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace DotPulsar.IntegrationTests.Services;
-
-using System;
-using System.Net.Http;
-using System.Threading.Tasks;
-using Xunit.Abstractions;
-using Xunit.Sdk;
-
-public sealed class StandaloneContainerService : PulsarServiceBase
-{
- public StandaloneContainerService(IMessageSink messageSink) : base(messageSink) { }
-
- public override async Task InitializeAsync()
- {
- await TakeDownPulsar(); // clean-up if anything was left running from previous run
-
- await ProcessAsyncHelper.ExecuteShellCommand("docker-compose", "-f docker-compose-standalone-tests.yml up -d")
- .ThrowOnFailure();
-
- var waitTries = 10;
-
- using var handler = new HttpClientHandler { AllowAutoRedirect = true };
-
- using var client = new HttpClient(handler);
-
- while (waitTries > 0)
- {
- try
- {
- await client.GetAsync("http://localhost:54546/metrics/").ConfigureAwait(false);
- return;
- }
- catch
- {
- waitTries--;
- await Task.Delay(5000).ConfigureAwait(false);
- }
- }
-
- throw new Exception("Unable to confirm Pulsar has initialized");
- }
-
- protected override Task OnDispose() => TakeDownPulsar();
-
- private Task TakeDownPulsar()
- => ProcessAsyncHelper.ExecuteShellCommand("docker-compose", "-f docker-compose-standalone-tests.yml down")
- .LogFailure(s => MessageSink.OnMessage(new DiagnosticMessage("Error bringing down container: {0}", s)));
-
- public override Uri GetBrokerUri()
- => new("pulsar://localhost:54545");
-
- public override Uri GetWebServiceUri()
- => new("http://localhost:54546");
-}
diff --git a/tests/DotPulsar.IntegrationTests/Services/StandaloneExternalService.cs b/tests/DotPulsar.IntegrationTests/Services/StandaloneExternalService.cs
deleted file mode 100644
index c1ce2f0..0000000
--- a/tests/DotPulsar.IntegrationTests/Services/StandaloneExternalService.cs
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace DotPulsar.IntegrationTests.Services;
-
-using System;
-using System.Threading.Tasks;
-using Xunit.Abstractions;
-
-public sealed class StandaloneExternalService : PulsarServiceBase
-{
- public StandaloneExternalService(IMessageSink messageSink) : base(messageSink) { }
- public override Task InitializeAsync() => Task.CompletedTask;
-
- public override Uri GetBrokerUri()
- => new("pulsar://localhost:6650");
-
- public override Uri GetWebServiceUri()
- => new("http://localhost:8080");
-}
diff --git a/tests/DotPulsar.IntegrationTests/Fixtures/StandaloneClusterTests.cs b/tests/DotPulsar.IntegrationTests/StandaloneCollection.cs
similarity index 75%
rename from tests/DotPulsar.IntegrationTests/Fixtures/StandaloneClusterTests.cs
rename to tests/DotPulsar.IntegrationTests/StandaloneCollection.cs
index 1a3504b..04b2e07 100644
--- a/tests/DotPulsar.IntegrationTests/Fixtures/StandaloneClusterTests.cs
+++ b/tests/DotPulsar.IntegrationTests/StandaloneCollection.cs
@@ -12,9 +12,9 @@
* limitations under the License.
*/
-namespace DotPulsar.IntegrationTests.Fixtures;
+namespace DotPulsar.IntegrationTests;
using Xunit;
-[CollectionDefinition(nameof(StandaloneClusterTest))]
-public class StandaloneClusterTest : ICollectionFixture<StandaloneClusterFixture> { }
+[CollectionDefinition(nameof(StandaloneCollection))]
+public sealed class StandaloneCollection : ICollectionFixture<StandaloneFixture> { }
diff --git a/tests/DotPulsar.IntegrationTests/StandaloneFixture.cs b/tests/DotPulsar.IntegrationTests/StandaloneFixture.cs
new file mode 100644
index 0000000..b5d898b
--- /dev/null
+++ b/tests/DotPulsar.IntegrationTests/StandaloneFixture.cs
@@ -0,0 +1,121 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.IntegrationTests;
+
+using DotPulsar.TestHelpers;
+using System;
+using System.Net.Http;
+using System.Net.Http.Headers;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using Xunit;
+using Xunit.Abstractions;
+using Xunit.Sdk;
+
+public sealed class StandaloneFixture : IAsyncLifetime
+{
+ private readonly string _containerName;
+ private readonly Uri _webServiceUri;
+ private readonly CancellationTokenSource _cts;
+ private readonly IMessageSink _messageSink;
+
+ public StandaloneFixture(IMessageSink messageSink)
+ {
+ _containerName = "standalone";
+ _webServiceUri = new("http://localhost:54548");
+ ServiceUrl = new("pulsar://localhost:54547");
+ _cts = new CancellationTokenSource();
+ _messageSink = messageSink;
+ }
+
+ public Uri ServiceUrl { get; }
+
+ public async Task DisposeAsync()
+ {
+ _cts.Cancel();
+ await TakeDownPulsar();
+ }
+
+ public async Task InitializeAsync()
+ {
+ await TakeDownPulsar(); // clean-up if anything was left running from previous run
+
+ await ProcessAsyncHelper
+ .ExecuteShellCommand("docker-compose", $"-f {_containerName}.yml up -d")
+ .ThrowOnFailure();
+
+ using var handler = new HttpClientHandler { AllowAutoRedirect = true };
+ using var client = new HttpClient(handler);
+
+ var token = await GetToken(Timeout.InfiniteTimeSpan);
+ client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", token);
+
+ var waitTries = 10;
+ while (waitTries > 0)
+ {
+ try
+ {
+ var requestUri = new Uri(_webServiceUri, "metrics/");
+ var response = await client.GetAsync(requestUri);
+ if (response.IsSuccessStatusCode)
+ return;
+ }
+ catch (Exception e)
+ {
+ _messageSink.OnMessage(new DiagnosticMessage("Error trying to fetch metrics: {0}", e));
+ }
+
+ waitTries--;
+ await Task.Delay(TimeSpan.FromSeconds(10));
+ }
+
+ throw new Exception("Unable to confirm Pulsar has initialized");
+ }
+
+ public async Task<string> GetToken(TimeSpan expiryTime)
+ {
+ var arguments = $"exec {_containerName} bin/pulsar tokens create --secret-key file:///appdata/my-secret.key --subject test-user";
+
+ if (expiryTime != Timeout.InfiniteTimeSpan)
+ arguments += $" --expiry-time {expiryTime.TotalSeconds}s";
+
+ var tokenCreateRequest = await ProcessAsyncHelper.ExecuteShellCommand("docker", arguments);
+
+ if (tokenCreateRequest.Completed)
+ return tokenCreateRequest.Output;
+
+ throw new InvalidOperationException($"Getting token from container failed: {tokenCreateRequest.Output}");
+ }
+
+ public async Task CreatePartitionedTopic(string restTopic, int numPartitions)
+ {
+ using var client = new HttpClient();
+ var token = await GetToken(Timeout.InfiniteTimeSpan);
+ client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", token);
+ var requestUri = new Uri(_webServiceUri, $"admin/v2/{restTopic}/partitions");
+ var content = new StringContent(numPartitions.ToString(), Encoding.UTF8, "application/json");
+ var response = await client.PutAsync(requestUri, content, _cts.Token);
+ if (response.IsSuccessStatusCode)
+ return;
+
+ throw new Exception($"Could not create the partition topic. Got status code {response.StatusCode}");
+ }
+
+ private async Task TakeDownPulsar()
+ => await ProcessAsyncHelper
+ .ExecuteShellCommand("docker-compose", $"-f {_containerName}.yml down")
+ .LogFailure(s => _messageSink.OnMessage(new DiagnosticMessage($"Error bringing down container: {s}")));
+}
diff --git a/tests/DotPulsar.IntegrationTests/TokenRefreshTests.cs b/tests/DotPulsar.IntegrationTests/TokenRefreshTests.cs
deleted file mode 100644
index e244cab..0000000
--- a/tests/DotPulsar.IntegrationTests/TokenRefreshTests.cs
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace DotPulsar.IntegrationTests;
-
-using Abstraction;
-using Abstractions;
-using Extensions;
-using Fixtures;
-using System;
-using System.Collections.Generic;
-using System.Diagnostics;
-using System.Linq;
-using System.Threading;
-using System.Threading.Tasks;
-using Xunit;
-using Xunit.Abstractions;
-
-[Collection(nameof(StandaloneTokenClusterTests))]
-public class TokenRefreshTests
-{
- public enum TokenTestRefreshType
- {
- Standard,
- FailAtStartup,
- FailOnRefresh,
- TimeoutOnRefresh
- }
-
- private const string MyTopic = "persistent://public/default/mytopic";
- private readonly ITestOutputHelper _testOutputHelper;
- private readonly IPulsarService _pulsarService;
-
- public TokenRefreshTests(ITestOutputHelper outputHelper, StandaloneTokenClusterFixture fixture)
- {
- _testOutputHelper = outputHelper;
- Debug.Assert(fixture.PulsarService != null, "fixture.PulsarService != null");
- _pulsarService = fixture.PulsarService;
- }
-
- [InlineData(TokenTestRefreshType.Standard, 0)] // Standard happy path with no token refresh failures
- [InlineData(TokenTestRefreshType.FailAtStartup, 1)] // 1 Failure at startup, not on refresh
- [InlineData(TokenTestRefreshType.FailOnRefresh, 2)] // Fails on refresh which will force a reconnection and fail once more on new connection
- [InlineData(TokenTestRefreshType.TimeoutOnRefresh, 0)] // Connection will be disconnected by server due to slow response to auth challenge
- [Theory]
- public async Task TestExpiryRefresh(TokenTestRefreshType refreshType, int timesToFail)
- {
- var publishingStarted = false;
- var delayedNames = new HashSet<string>();
- ValueTask<string> GetToken(string name, ref int count)
- {
- if (refreshType is TokenTestRefreshType.Standard)
- {
- return GetAuthToken(name);
- }
-
- if (refreshType is TokenTestRefreshType.FailAtStartup && !publishingStarted && ++count <= timesToFail)
- {
- return ValueTask.FromException<string>(new Exception("Initial Token Failed"));
- }
-
- if (refreshType is TokenTestRefreshType.FailOnRefresh && publishingStarted && ++count <= timesToFail)
- {
- return ValueTask.FromException<string>(count == 1 ? new Exception("Refresh Failed") : new Exception("Initial Token Failed"));
- }
-
- if (refreshType is TokenTestRefreshType.TimeoutOnRefresh && publishingStarted && !delayedNames.Contains(name))
- {
- delayedNames.Add(name);
- Task.Delay(6000);
- }
-
- return GetAuthToken(name);
- }
-
- var producerTokenCount = 0;
- await using var producerClient = GetPulsarClient("Producer", (ct) => GetToken("Producer", ref producerTokenCount));
-
- var consumerTokenCount = 0;
- await using var consumerClient = GetPulsarClient("Consumer", (ct) => GetToken("Consumer", ref consumerTokenCount));
-
- await using var producer = producerClient.NewProducer(Schema.String)
- .Topic(MyTopic)
- .StateChangedHandler(Monitor)
- .Create();
-
- await using var consumer = consumerClient.NewConsumer(Schema.String)
- .Topic(MyTopic)
- .StateChangedHandler(Monitor)
- .SubscriptionName("test-sub")
- .InitialPosition(SubscriptionInitialPosition.Earliest)
- .Create();
-
- const int messageCount = 20;
- var received = new List<string>(messageCount);
-
- var publisherTask = Task.Run(async () =>
- {
- for (var i = 0; i < messageCount; i++)
- {
- _testOutputHelper.WriteLine("Trying to publish message for index {0}", i);
- var messageId = await producer.Send(i.ToString());
- publishingStarted = true;
- _testOutputHelper.WriteLine("Published message {0} for index {1}", messageId, i);
- await Task.Delay(1000);
- }
- });
-
- var consumerTask = Task.Run(async () =>
- {
- for (var i = 0; i < messageCount; i++)
- {
- var message = await consumer.Receive();
- received.Add(message.Value());
- }
- });
-
- var timeoutTask = Task.Delay(60_000);
- await Task.WhenAny(Task.WhenAll(consumerTask, publisherTask), timeoutTask);
- Assert.False(timeoutTask.IsCompleted);
-
- var expected = Enumerable.Range(0, messageCount).Select(i => i.ToString()).ToList();
- var missing = expected.Except(received).ToList();
-
- if (missing.Count > 0)
- {
- Assert.True(false, $"Missing values: {string.Join(",", missing)}");
- }
- }
-
- private IPulsarClient GetPulsarClient(string name, Func<CancellationToken, ValueTask<string>> tokenFactory)
- => PulsarClient.Builder()
- .Authentication(AuthenticationFactory.Token(tokenFactory))
- .RetryInterval(TimeSpan.FromSeconds(1))
- .ExceptionHandler(ec =>
- {
- _testOutputHelper.WriteLine("Error (handled={0}) occurred in {1} client: {2}", ec.ExceptionHandled, name, ec.Exception);
- })
- .ServiceUrl(_pulsarService.GetBrokerUri()).Build();
-
- private async ValueTask<string> GetAuthToken(string name)
- {
- var result = await StandaloneTokenClusterFixture.GetAuthToken(true);
- _testOutputHelper.WriteLine("{0} received token {1}", name, result);
- return result;
- }
-
- private void Monitor(ProducerStateChanged stateChanged, CancellationToken cancellationToken)
- {
- var stateMessage = stateChanged.ProducerState switch
- {
- ProducerState.Connected => "is connected",
- ProducerState.Disconnected => "is disconnected",
- ProducerState.PartiallyConnected => "is partially connected",
- ProducerState.Closed => "has closed",
- ProducerState.Faulted => "has faulted",
- _ => $"has an unknown state '{stateChanged.ProducerState}'"
- };
-
- var topic = stateChanged.Producer.Topic;
- _testOutputHelper.WriteLine($"The producer for topic '{topic}' " + stateMessage);
- }
-
- private void Monitor(ConsumerStateChanged stateChanged, CancellationToken cancellationToken)
- {
- var stateMessage = stateChanged.ConsumerState switch
- {
- ConsumerState.Active => "is active",
- ConsumerState.Inactive => "is inactive",
- ConsumerState.Disconnected => "is disconnected",
- ConsumerState.Closed => "has closed",
- ConsumerState.ReachedEndOfTopic => "has reached end of topic",
- ConsumerState.Faulted => "has faulted",
- _ => $"has an unknown state '{stateChanged.ConsumerState}'"
- };
-
- var topic = stateChanged.Consumer.Topic;
- _testOutputHelper.WriteLine($"The consumer for topic '{topic}' " + stateMessage);
- }
-}
diff --git a/tests/DotPulsar.IntegrationTests/TokenTests.cs b/tests/DotPulsar.IntegrationTests/TokenTests.cs
new file mode 100644
index 0000000..f6306e5
--- /dev/null
+++ b/tests/DotPulsar.IntegrationTests/TokenTests.cs
@@ -0,0 +1,162 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.IntegrationTests;
+
+using DotPulsar.Abstractions;
+using DotPulsar.Exceptions;
+using Extensions;
+using FluentAssertions;
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using Xunit;
+using Xunit.Abstractions;
+
+[Collection(nameof(StandaloneCollection))]
+public class TokenTests
+{
+ private const string MyTopic = "persistent://public/default/mytopic";
+
+ private readonly ITestOutputHelper _testOutputHelper;
+ private readonly StandaloneFixture _fixture;
+
+ public TokenTests(ITestOutputHelper outputHelper, StandaloneFixture fixture)
+ {
+ _testOutputHelper = outputHelper;
+ _fixture = fixture;
+ }
+
+ [Fact]
+ public async Task TokenSupplier_WhenTokenSupplierInitiallyThrowsAnException_ShouldFaultProducer()
+ {
+ // Arrange
+ await using var client = CreateClient(ct => throw new Exception());
+ await using var producer = CreateProducer(client);
+
+ // Act
+ var exception = await Record.ExceptionAsync(() => producer.Send("Test").AsTask());
+ var state = await producer.OnStateChangeTo(ProducerState.Faulted);
+
+ // Assert
+ exception.Should().BeOfType<Exception>();
+ state.Should().Be(ProducerState.Faulted);
+ }
+
+ [Fact]
+ public async Task TokenSupplier_WhenTokenSupplierThrowsAnExceptionOnAuthChallenge_ShouldFaultProducer()
+ {
+ // Arrange
+ var throwException = false;
+ await using var client = CreateClient(async ct =>
+ {
+ if (throwException)
+ throw new Exception();
+ var token = await _fixture.GetToken(TimeSpan.FromSeconds(10));
+ _testOutputHelper.WriteLine($"Received token: {token}");
+ return token;
+ });
+
+ await using var producer = CreateProducer(client);
+
+ // Act
+ _ = await producer.Send("Test"); // Make sure we have a working connection
+ throwException = true;
+ var state = await producer.OnStateChangeTo(ProducerState.Faulted);
+
+ // Assert
+ state.Should().Be(ProducerState.Faulted);
+ }
+
+ [Fact]
+ public async Task TokenSupplier_WhenTokenSupplierReturnsToLate_ShouldFaultProducer()
+ {
+ // Arrange
+ await using var client = CreateClient(async ct =>
+ {
+ await Task.Delay(TimeSpan.FromSeconds(10), ct);
+ return string.Empty;
+ });
+
+ await using var producer = CreateProducer(client);
+
+ // Act
+ var exception = await Record.ExceptionAsync(() => producer.Send("Test").AsTask());
+ var state = await producer.OnStateChangeTo(ProducerState.Faulted);
+
+ // Assert
+ exception.Should().BeOfType<AuthenticationException>();
+ state.Should().Be(ProducerState.Faulted);
+ }
+
+ [Fact]
+ public async Task TokenSupplier_WhenTokenSupplierReturnValidToken_ShouldStayConnected()
+ {
+ // Arrange
+ var refreshCount = 0;
+ var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
+
+ await using var client = CreateClient(async ct =>
+ {
+ ++refreshCount;
+ if (refreshCount == 3)
+ tcs.SetResult();
+
+ var token = await _fixture.GetToken(TimeSpan.FromSeconds(10));
+ _testOutputHelper.WriteLine($"Received token: {token}");
+ return token;
+ });
+
+ await using var producer = CreateProducer(client);
+
+ // Act
+ _ = await producer.Send("Test"); // Make sure we have a working connection
+ await tcs.Task;
+ var state = await producer.OnStateChangeTo(ProducerState.Connected);
+
+ // Assert
+ state.Should().Be(ProducerState.Connected);
+ }
+
+ private IPulsarClient CreateClient(Func<CancellationToken, ValueTask<string>> tokenSupplier)
+ => PulsarClient
+ .Builder()
+ .Authentication(AuthenticationFactory.Token(tokenSupplier))
+ .ExceptionHandler(ec => _testOutputHelper.WriteLine($"Exception: {ec.Exception}"))
+ .ServiceUrl(_fixture.ServiceUrl)
+ .Build();
+
+ private IProducer<string> CreateProducer(IPulsarClient client)
+ => client
+ .NewProducer(Schema.String)
+ .Topic(MyTopic)
+ .StateChangedHandler(Monitor)
+ .Create();
+
+ private void Monitor(ProducerStateChanged stateChanged, CancellationToken _)
+ {
+ var stateMessage = stateChanged.ProducerState switch
+ {
+ ProducerState.Connected => "is connected",
+ ProducerState.Disconnected => "is disconnected",
+ ProducerState.PartiallyConnected => "is partially connected",
+ ProducerState.Closed => "has closed",
+ ProducerState.Faulted => "has faulted",
+ _ => $"has an unknown state '{stateChanged.ProducerState}'"
+ };
+
+ var topic = stateChanged.Producer.Topic;
+ _testOutputHelper.WriteLine($"The producer for topic '{topic}' {stateMessage}");
+ }
+}
diff --git a/tests/DotPulsar.IntegrationTests/docker-compose-standalone-tests.yml b/tests/DotPulsar.IntegrationTests/docker-compose-standalone-tests.yml
deleted file mode 100644
index 0e517e5..0000000
--- a/tests/DotPulsar.IntegrationTests/docker-compose-standalone-tests.yml
+++ /dev/null
@@ -1,21 +0,0 @@
-version: '3.5'
-
-services:
-
- pulsar:
- container_name: pulsar-standalone
- image: 'apachepulsar/pulsar:2.7.0'
- ports:
- - '54546:8080'
- - '54545:6650'
- environment:
- PULSAR_MEM: " -Xms1g -Xmx1g -XX:MaxDirectMemorySize=2g"
- command: |
- /bin/bash -c "bin/apply-config-from-env.py conf/standalone.conf && bin/pulsar standalone --no-functions-worker"
- networks:
- - pulsar-standalone
-
-networks:
- pulsar-standalone:
- name: pulsar-standalone
- driver: bridge
diff --git a/tests/DotPulsar.IntegrationTests/docker-compose-standalone-token-tests.yml b/tests/DotPulsar.IntegrationTests/standalone.yml
similarity index 63%
rename from tests/DotPulsar.IntegrationTests/docker-compose-standalone-token-tests.yml
rename to tests/DotPulsar.IntegrationTests/standalone.yml
index af76c5b..035c1d2 100644
--- a/tests/DotPulsar.IntegrationTests/docker-compose-standalone-token-tests.yml
+++ b/tests/DotPulsar.IntegrationTests/standalone.yml
@@ -2,12 +2,12 @@
services:
- pulsar-tokens:
- container_name: pulsar-tokens
+ standalone:
+ container_name: standalone
image: 'apachepulsar/pulsar:2.7.0'
ports:
- - '54548:8081'
- - '54547:6651'
+ - '54548:8080'
+ - '54547:6650'
volumes:
- ./appdata/:/appdata
environment:
@@ -17,18 +17,10 @@ services:
- authenticationProviders=org.apache.pulsar.broker.authentication.AuthenticationProviderToken
- authenticateOriginalAuthData=false
- brokerClientAuthenticationPlugin=org.apache.pulsar.client.impl.auth.AuthenticationToken
- - brokerClientAuthenticationParameters=token:eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0LXVzZXIifQ.CoBrja1EHr0e2kZKGFS8M-xS2SOC2E08yZmjktvcYOs
+ - brokerClientAuthenticationParameters=token:eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0LXVzZXIifQ.KGnuZLJys6MUSth__ZRYdb4sPIKe9_kRrm2wfZ7Dwrc
- superUserRoles=test-user
- PULSAR_PREFIX_authenticationRefreshCheckSeconds=5
- - webServicePort=8081
- - brokerServicePort=6651
-# - PULSAR_LOG_LEVEL=debug
+ - webServicePort=8080
+ - brokerServicePort=6650
command: |
- /bin/bash -c "bin/apply-config-from-env.py conf/standalone.conf && bin/pulsar standalone --no-functions-worker"
- networks:
- - pulsar-tokens
-
-networks:
- pulsar-tokens:
- name: pulsar-tokens
- driver: bridge
+ /bin/bash -c "bin/apply-config-from-env.py conf/standalone.conf && bin/pulsar standalone --no-functions-worker"
\ No newline at end of file
diff --git a/tests/DotPulsar.StressTests/DotPulsar.StressTests.csproj b/tests/DotPulsar.StressTests/DotPulsar.StressTests.csproj
index 2d5d538..0a7476d 100644
--- a/tests/DotPulsar.StressTests/DotPulsar.StressTests.csproj
+++ b/tests/DotPulsar.StressTests/DotPulsar.StressTests.csproj
@@ -12,7 +12,7 @@
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
- <PackageReference Include="coverlet.collector" Version="3.1.0">
+ <PackageReference Include="coverlet.collector" Version="3.1.1">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
@@ -26,7 +26,7 @@
<ItemGroup>
<ProjectReference Include="..\..\src\DotPulsar\DotPulsar.csproj" />
- <ProjectReference Include="..\DotPulsar.IntegrationTests\DotPulsar.IntegrationTests.csproj" />
+ <ProjectReference Include="..\DotPulsar.TestHelpers\DotPulsar.TestHelpers.csproj" />
</ItemGroup>
</Project>
diff --git a/tests/DotPulsar.StressTests/EnumerableTaskExtensions.cs b/tests/DotPulsar.StressTests/EnumerableTaskExtensions.cs
index 078057d..2437085 100644
--- a/tests/DotPulsar.StressTests/EnumerableTaskExtensions.cs
+++ b/tests/DotPulsar.StressTests/EnumerableTaskExtensions.cs
@@ -44,7 +44,7 @@ public static class EnumerableValueTaskExtensions
[DebuggerStepThrough]
public static async Task<TResult[]> WhenAllAsTask<TResult>(this IEnumerable<ValueTask<TResult>> source) where TResult : notnull
- => await source.WhenAll().ConfigureAwait(false);
+ => await source.WhenAll();
[DebuggerStepThrough]
public static async IAsyncEnumerable<TResult> Enumerate<TResult>(this IEnumerable<ValueTask<TResult>> source) where TResult : notnull
@@ -52,7 +52,7 @@ public static class EnumerableValueTaskExtensions
foreach (var operation in source.Select(GetInfo))
{
yield return operation.Task is not null
- ? await operation.Task.ConfigureAwait(false)
+ ? await operation.Task
: operation.Result;
}
}
diff --git a/tests/DotPulsar.StressTests/Fixtures/StandaloneClusterFixture.cs b/tests/DotPulsar.StressTests/Fixtures/StandaloneClusterFixture.cs
index 500cd84..0236efe 100644
--- a/tests/DotPulsar.StressTests/Fixtures/StandaloneClusterFixture.cs
+++ b/tests/DotPulsar.StressTests/Fixtures/StandaloneClusterFixture.cs
@@ -14,7 +14,7 @@
namespace DotPulsar.StressTests.Fixtures;
-using IntegrationTests;
+using DotPulsar.TestHelpers;
using System;
using System.Net.Http;
using System.Threading.Tasks;
@@ -35,30 +35,30 @@ public class StandaloneClusterFixture : IAsyncLifetime
{
await TakeDownPulsar(); // clean-up if anything was left running from previous run
- await ProcessAsyncHelper.ExecuteShellCommand("docker-compose", "-f docker-compose-standalone-tests.yml up -d")
+ await ProcessAsyncHelper
+ .ExecuteShellCommand("docker-compose", "-f docker-compose-standalone-tests.yml up -d")
.ThrowOnFailure();
- var waitTries = 10;
-
- using var handler = new HttpClientHandler
- {
- AllowAutoRedirect = true
- };
-
+ using var handler = new HttpClientHandler { AllowAutoRedirect = true };
using var client = new HttpClient(handler);
+ var waitTries = 10;
while (waitTries > 0)
{
try
{
- await client.GetAsync("http://localhost:54546/metrics/").ConfigureAwait(false);
- return;
+ var requestUri = "http://localhost:54546/metrics/";
+ var response = await client.GetAsync(requestUri);
+ if (response.IsSuccessStatusCode)
+ return;
}
- catch
+ catch (Exception e)
{
- waitTries--;
- await Task.Delay(5000).ConfigureAwait(false);
+ _messageSink.OnMessage(new DiagnosticMessage("Error trying to fetch metrics: {0}", e));
}
+
+ waitTries--;
+ await Task.Delay(TimeSpan.FromSeconds(10));
}
throw new Exception("Unable to confirm Pulsar has initialized");
@@ -72,11 +72,12 @@ public class StandaloneClusterFixture : IAsyncLifetime
}
catch (Exception e)
{
- _messageSink.OnMessage(new DiagnosticMessage("Error taking down pulsar: {0}", e));
+ _messageSink.OnMessage(new DiagnosticMessage($"Error taking down pulsar: {e}"));
}
}
private Task TakeDownPulsar()
- => ProcessAsyncHelper.ExecuteShellCommand("docker-compose", "-f docker-compose-standalone-tests.yml down")
- .LogFailure(s => _messageSink.OnMessage(new DiagnosticMessage("Error bringing down container: {0}", s)));
+ => ProcessAsyncHelper
+ .ExecuteShellCommand("docker-compose", "-f docker-compose-standalone-tests.yml down")
+ .LogFailure(s => _messageSink.OnMessage(new DiagnosticMessage($"Error bringing down container: {s}")));
}
diff --git a/tests/DotPulsar.StressTests/XunitExceptionHandler.cs b/tests/DotPulsar.StressTests/XunitExceptionHandler.cs
index 59211f4..79102d1 100644
--- a/tests/DotPulsar.StressTests/XunitExceptionHandler.cs
+++ b/tests/DotPulsar.StressTests/XunitExceptionHandler.cs
@@ -38,9 +38,6 @@ internal class XunitExceptionHandler : IHandleException
await _exceptionHandler.OnException(exceptionContext).ConfigureAwait(false);
if (!exceptionContext.ExceptionHandled)
- _output.WriteLine(
- $"{exceptionContext.Exception.GetType().Name} " +
- $"{exceptionContext.Exception.Message}{Environment.NewLine}" +
- $"{exceptionContext.Exception.StackTrace}");
+ _output.WriteLine($"Got exception: {exceptionContext.Exception}");
}
}
diff --git a/tests/DotPulsar.TestHelpers/DotPulsar.TestHelpers.csproj b/tests/DotPulsar.TestHelpers/DotPulsar.TestHelpers.csproj
new file mode 100644
index 0000000..132c02c
--- /dev/null
+++ b/tests/DotPulsar.TestHelpers/DotPulsar.TestHelpers.csproj
@@ -0,0 +1,9 @@
+<Project Sdk="Microsoft.NET.Sdk">
+
+ <PropertyGroup>
+ <TargetFramework>net6.0</TargetFramework>
+ <ImplicitUsings>enable</ImplicitUsings>
+ <Nullable>enable</Nullable>
+ </PropertyGroup>
+
+</Project>
diff --git a/tests/DotPulsar.IntegrationTests/ProcessAsyncHelper.cs b/tests/DotPulsar.TestHelpers/ProcessAsyncHelper.cs
similarity index 92%
rename from tests/DotPulsar.IntegrationTests/ProcessAsyncHelper.cs
rename to tests/DotPulsar.TestHelpers/ProcessAsyncHelper.cs
index c967563..73865cd 100644
--- a/tests/DotPulsar.IntegrationTests/ProcessAsyncHelper.cs
+++ b/tests/DotPulsar.TestHelpers/ProcessAsyncHelper.cs
@@ -12,7 +12,7 @@
* limitations under the License.
*/
-namespace DotPulsar.IntegrationTests;
+namespace DotPulsar.TestHelpers;
using System;
using System.Diagnostics;
@@ -26,9 +26,7 @@ public static class ProcessAsyncHelper
var result = await resultTask;
if (!result.Completed)
- {
- throw new InvalidOperationException($"Process did not complete correctly, {Environment.NewLine}{result.Output}");
- }
+ throw new InvalidOperationException($"Process did not complete correctly: {result.Output}");
}
public static async Task LogFailure(this Task<ProcessResult> resultTask, Action<string> logAction)
@@ -36,15 +34,11 @@ public static class ProcessAsyncHelper
var result = await resultTask;
if (!result.Completed)
- {
logAction(result.Output);
- }
}
public static async Task<ProcessResult> ExecuteShellCommand(string command, string arguments)
{
- var result = new ProcessResult();
-
using var process = new Process();
process.StartInfo.FileName = command;
@@ -60,14 +54,10 @@ public static class ProcessAsyncHelper
process.OutputDataReceived += (s, e) =>
{
// The output stream has been closed i.e. the process has terminated
- if (e.Data == null)
- {
+ if (e.Data is null)
outputCloseEvent.SetResult(true);
- }
else
- {
outputBuilder.Append(e.Data);
- }
};
var errorBuilder = new StringBuilder();
@@ -76,16 +66,13 @@ public static class ProcessAsyncHelper
process.ErrorDataReceived += (s, e) =>
{
// The error stream has been closed i.e. the process has terminated
- if (e.Data == null)
- {
+ if (e.Data is null)
errorCloseEvent.SetResult(true);
- }
else
- {
errorBuilder.Append(e.Data);
- }
};
+ var result = new ProcessResult();
bool isStarted;
try
diff --git a/tests/DotPulsar.Tests/DotPulsar.Tests.csproj b/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
index 4d44986..713a379 100644
--- a/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
+++ b/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
@@ -14,7 +14,7 @@
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
- <PackageReference Include="coverlet.collector" Version="3.1.0">
+ <PackageReference Include="coverlet.collector" Version="3.1.1">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>