You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bl...@apache.org on 2021/06/21 19:33:53 UTC
[pulsar-dotpulsar] branch master updated: Basic partitioned
producer (#71)
This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 85c2f70 Basic partitioned producer (#71)
85c2f70 is described below
commit 85c2f70b545e7398dbfd3707e5ffc7831303b47e
Author: Zike Yang <ar...@armail.top>
AuthorDate: Tue Jun 22 03:33:46 2021 +0800
Basic partitioned producer (#71)
* Add GetPartitions.
* Add PartitionedProducerProcess.
* Add PartitionedProducerProcess Test.
* Remove PartitionedProducerState.
* Add UpdatePartitions Event.
* Implement PartitionedProducer.
* Refactor CreateProducer
* Fix sub topic format error
* Fix some problems
* Remove PartitionedProducerProcess.
* Make some classes sealed.
* Use await in PartitionedProducer.Send
* Make PulsarClient internal and fix tests.
* Make GetNumberOfPartitions private.
* Fix test method name.
* Use NSubstitute instead of Moq.
* Make CreateSubProducers sync.
* Fix some comments.
* Fix warnings
* Subproducers
* Update
* Change PartitionedProducer to Producer and add some comments.
* Add UpdatePartitions to connect logic and fix some other problems.
* Update Partitions when start process.
* Fix test warnings.
---
samples/Producing/Program.cs | 1 +
.../IMessageRouter.cs} | 25 +---
src/DotPulsar/DotPulsar.csproj | 3 +-
src/DotPulsar/Internal/Abstractions/IConnection.cs | 1 +
src/DotPulsar/Internal/Abstractions/Process.cs | 6 +
src/DotPulsar/Internal/ChannelManager.cs | 3 +
src/DotPulsar/Internal/Connection.cs | 16 +++
.../Events/PartitionedSubProducerStateChanged.cs} | 34 ++---
.../Events/UpdatePartitions.cs} | 33 ++---
.../Internal/Exceptions/LookupNotReadyException.cs | 23 ++++
.../Internal/Extensions/CommandExtensions.cs | 9 ++
src/DotPulsar/Internal/Producer.cs | 142 +++++++++------------
src/DotPulsar/Internal/ProducerBuilder.cs | 9 ++
src/DotPulsar/Internal/ProducerProcess.cs | 99 +++++++++++++-
src/DotPulsar/Internal/PulsarClientFactory.cs | 28 ++++
src/DotPulsar/Internal/RequestResponseHandler.cs | 7 +
.../Internal/{Producer.cs => SubProducer.cs} | 5 +-
src/DotPulsar/ProducerOptions.cs | 6 +
src/DotPulsar/ProducerState.cs | 7 +-
src/DotPulsar/PulsarClient.cs | 54 +++++++-
src/DotPulsar/RoundRobinPartitionRouter.cs | 46 +++++++
src/DotPulsar/SinglePartitionRouter.cs | 51 ++++++++
tests/DotPulsar.Tests/DotPulsar.Tests.csproj | 1 +
.../Internal/PartitionedProducerProcessTests.cs | 111 ++++++++++++++++
tests/DotPulsar.Tests/PulsarClientTests.cs | 75 +++++++++++
25 files changed, 638 insertions(+), 157 deletions(-)
diff --git a/samples/Producing/Program.cs b/samples/Producing/Program.cs
index 5ea9a51..9c2ef0e 100644
--- a/samples/Producing/Program.cs
+++ b/samples/Producing/Program.cs
@@ -71,6 +71,7 @@ namespace Producing
{
ProducerState.Connected => "is connected",
ProducerState.Disconnected => "is disconnected",
+ ProducerState.PartiallyConnected => "has partially connected",
ProducerState.Closed => "has closed",
ProducerState.Faulted => "has faulted",
_ => $"has an unknown state '{stateChanged.ProducerState}'"
diff --git a/src/DotPulsar/ProducerState.cs b/src/DotPulsar/Abstractions/IMessageRouter.cs
similarity index 56%
copy from src/DotPulsar/ProducerState.cs
copy to src/DotPulsar/Abstractions/IMessageRouter.cs
index 5bab409..6652c9c 100644
--- a/src/DotPulsar/ProducerState.cs
+++ b/src/DotPulsar/Abstractions/IMessageRouter.cs
@@ -12,31 +12,16 @@
* limitations under the License.
*/
-namespace DotPulsar
+namespace DotPulsar.Abstractions
{
/// <summary>
- /// The possible states a producer can be in.
+ /// A message routing abstraction
/// </summary>
- public enum ProducerState : byte
+ public interface IMessageRouter
{
/// <summary>
- /// The producer is closed. This is a final state.
+ /// Choose a partition.
/// </summary>
- Closed,
-
- /// <summary>
- /// The producer is connected.
- /// </summary>
- Connected,
-
- /// <summary>
- /// The producer is disconnected.
- /// </summary>
- Disconnected,
-
- /// <summary>
- /// The producer is faulted. This is a final state.
- /// </summary>
- Faulted
+ int ChoosePartition(MessageMetadata? messageMetadata, int partitionsCount);
}
}
diff --git a/src/DotPulsar/DotPulsar.csproj b/src/DotPulsar/DotPulsar.csproj
index 5538b9b..9cce4c3 100644
--- a/src/DotPulsar/DotPulsar.csproj
+++ b/src/DotPulsar/DotPulsar.csproj
@@ -22,7 +22,8 @@
</PropertyGroup>
<ItemGroup>
- <PackageReference Include="Microsoft.Extensions.ObjectPool" Version="5.0.6" />
+ <PackageReference Include="HashDepot" Version="2.0.3" />
+ <PackageReference Include="Microsoft.Extensions.ObjectPool" Version="5.0.6" />
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0" PrivateAssets="All" />
<PackageReference Include="protobuf-net" Version="3.0.101" />
<PackageReference Include="System.IO.Pipelines" Version="5.0.1" />
diff --git a/src/DotPulsar/Internal/Abstractions/IConnection.cs b/src/DotPulsar/Internal/Abstractions/IConnection.cs
index 368a111..f6c7092 100644
--- a/src/DotPulsar/Internal/Abstractions/IConnection.cs
+++ b/src/DotPulsar/Internal/Abstractions/IConnection.cs
@@ -41,5 +41,6 @@ namespace DotPulsar.Internal.Abstractions
Task<BaseCommand> Send(CommandCloseConsumer command, CancellationToken cancellationToken);
Task<BaseCommand> Send(SendPackage command, CancellationToken cancellationToken);
Task<BaseCommand> Send(CommandGetOrCreateSchema command, CancellationToken cancellationToken);
+ Task<BaseCommand> Send(CommandPartitionedTopicMetadata command, CancellationToken cancellationToken);
}
}
diff --git a/src/DotPulsar/Internal/Abstractions/Process.cs b/src/DotPulsar/Internal/Abstractions/Process.cs
index 056471a..4c0d3d8 100644
--- a/src/DotPulsar/Internal/Abstractions/Process.cs
+++ b/src/DotPulsar/Internal/Abstractions/Process.cs
@@ -68,11 +68,17 @@ namespace DotPulsar.Internal.Abstractions
case ChannelUnsubscribed _:
ChannelState = ChannelState.Unsubscribed;
break;
+ default:
+ HandleExtend(e);
+ break;
}
CalculateState();
}
protected abstract void CalculateState();
+
+ protected virtual void HandleExtend(IEvent e)
+ => throw new NotImplementedException();
}
}
diff --git a/src/DotPulsar/Internal/ChannelManager.cs b/src/DotPulsar/Internal/ChannelManager.cs
index ed3fa51..8f61858 100644
--- a/src/DotPulsar/Internal/ChannelManager.cs
+++ b/src/DotPulsar/Internal/ChannelManager.cs
@@ -160,6 +160,9 @@ namespace DotPulsar.Internal
public Task<BaseCommand> Outgoing(CommandLookupTopic command)
=> _requestResponseHandler.Outgoing(command);
+ public Task<BaseCommand> Outgoing(CommandPartitionedTopicMetadata command)
+ => _requestResponseHandler.Outgoing(command);
+
public Task<BaseCommand> Outgoing(CommandSeek command)
{
using (TakeConsumerSenderLock(command.ConsumerId))
diff --git a/src/DotPulsar/Internal/Connection.cs b/src/DotPulsar/Internal/Connection.cs
index efc1b12..b7ebc52 100644
--- a/src/DotPulsar/Internal/Connection.cs
+++ b/src/DotPulsar/Internal/Connection.cs
@@ -239,6 +239,22 @@ namespace DotPulsar.Internal
return await responseTask.ConfigureAwait(false);
}
+ public async Task<BaseCommand> Send(CommandPartitionedTopicMetadata command, CancellationToken cancellationToken)
+ {
+ ThrowIfDisposed();
+
+ Task<BaseCommand>? responseTask;
+
+ using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
+ {
+ responseTask = _channelManager.Outgoing(command);
+ var sequence = Serializer.Serialize(command.AsBaseCommand());
+ await _stream.Send(sequence).ConfigureAwait(false);
+ }
+
+ return await responseTask.ConfigureAwait(false);
+ }
+
private async Task Send(BaseCommand command, CancellationToken cancellationToken)
{
ThrowIfDisposed();
diff --git a/src/DotPulsar/ProducerState.cs b/src/DotPulsar/Internal/Events/PartitionedSubProducerStateChanged.cs
similarity index 52%
copy from src/DotPulsar/ProducerState.cs
copy to src/DotPulsar/Internal/Events/PartitionedSubProducerStateChanged.cs
index 5bab409..326641d 100644
--- a/src/DotPulsar/ProducerState.cs
+++ b/src/DotPulsar/Internal/Events/PartitionedSubProducerStateChanged.cs
@@ -12,31 +12,23 @@
* limitations under the License.
*/
-namespace DotPulsar
+namespace DotPulsar.Internal.Events
{
+ using Abstractions;
+ using System;
+
/// <summary>
- /// The possible states a producer can be in.
+ /// Representation of the sub producer of a partitioned producer state change.
/// </summary>
- public enum ProducerState : byte
+ public sealed class PartitionedSubProducerStateChanged : IEvent
{
- /// <summary>
- /// The producer is closed. This is a final state.
- /// </summary>
- Closed,
-
- /// <summary>
- /// The producer is connected.
- /// </summary>
- Connected,
-
- /// <summary>
- /// The producer is disconnected.
- /// </summary>
- Disconnected,
+ public Guid CorrelationId { get; }
+ public ProducerState ProducerState { get; }
- /// <summary>
- /// The producer is faulted. This is a final state.
- /// </summary>
- Faulted
+ public PartitionedSubProducerStateChanged(Guid correlationId, ProducerState producerState)
+ {
+ CorrelationId = correlationId;
+ ProducerState = producerState;
+ }
}
}
diff --git a/src/DotPulsar/ProducerState.cs b/src/DotPulsar/Internal/Events/UpdatePartitions.cs
similarity index 52%
copy from src/DotPulsar/ProducerState.cs
copy to src/DotPulsar/Internal/Events/UpdatePartitions.cs
index 5bab409..b9894b5 100644
--- a/src/DotPulsar/ProducerState.cs
+++ b/src/DotPulsar/Internal/Events/UpdatePartitions.cs
@@ -12,31 +12,24 @@
* limitations under the License.
*/
-namespace DotPulsar
+namespace DotPulsar.Internal.Events
{
+ using Abstractions;
+ using System;
+
/// <summary>
- /// The possible states a producer can be in.
+ /// Representation of the partitions count of the partitioned topic updating.
/// </summary>
- public enum ProducerState : byte
+ public sealed class UpdatePartitions : IEvent
{
- /// <summary>
- /// The producer is closed. This is a final state.
- /// </summary>
- Closed,
-
- /// <summary>
- /// The producer is connected.
- /// </summary>
- Connected,
+ public Guid CorrelationId { get; }
- /// <summary>
- /// The producer is disconnected.
- /// </summary>
- Disconnected,
+ public uint PartitionsCount { get; }
- /// <summary>
- /// The producer is faulted. This is a final state.
- /// </summary>
- Faulted
+ public UpdatePartitions(Guid correlationId, uint partitionsCount)
+ {
+ CorrelationId = correlationId;
+ PartitionsCount = partitionsCount;
+ }
}
}
diff --git a/src/DotPulsar/Internal/Exceptions/LookupNotReadyException.cs b/src/DotPulsar/Internal/Exceptions/LookupNotReadyException.cs
new file mode 100644
index 0000000..13fc562
--- /dev/null
+++ b/src/DotPulsar/Internal/Exceptions/LookupNotReadyException.cs
@@ -0,0 +1,23 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal.Exceptions
+{
+ using DotPulsar.Exceptions;
+
+ public sealed class LookupNotReadyException : DotPulsarException
+ {
+ public LookupNotReadyException() : base("The topic lookup operation is not ready yet") { }
+ }
+}
diff --git a/src/DotPulsar/Internal/Extensions/CommandExtensions.cs b/src/DotPulsar/Internal/Extensions/CommandExtensions.cs
index 0a76401..ffa9295 100644
--- a/src/DotPulsar/Internal/Extensions/CommandExtensions.cs
+++ b/src/DotPulsar/Internal/Extensions/CommandExtensions.cs
@@ -46,6 +46,9 @@ namespace DotPulsar.Internal.Extensions
public static void Throw(this CommandGetOrCreateSchemaResponse command)
=> Throw(command.ErrorCode, command.ErrorMessage);
+ public static void Throw(this CommandPartitionedTopicMetadataResponse command)
+ => Throw(command.Error, command.Message);
+
private static void Throw(ServerError error, string message)
=> throw (error switch
{
@@ -187,5 +190,11 @@ namespace DotPulsar.Internal.Extensions
CommandType = BaseCommand.Type.GetOrCreateSchema,
GetOrCreateSchema = command
};
+
+ public static BaseCommand AsBaseCommand(this CommandPartitionedTopicMetadata command)
+ => new BaseCommand
+ {
+ CommandType = BaseCommand.Type.PartitionedMetadata, PartitionMetadata = command
+ };
}
}
diff --git a/src/DotPulsar/Internal/Producer.cs b/src/DotPulsar/Internal/Producer.cs
index b130318..d3c3495 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -16,28 +16,26 @@ namespace DotPulsar.Internal
{
using Abstractions;
using DotPulsar.Abstractions;
- using DotPulsar.Exceptions;
- using DotPulsar.Internal.Extensions;
using Events;
- using Microsoft.Extensions.ObjectPool;
+ using Exceptions;
using System;
- using System.Buffers;
+ using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
public sealed class Producer<TMessage> : IEstablishNewChannel, IProducer<TMessage>
{
- private readonly ObjectPool<PulsarApi.MessageMetadata> _messageMetadataPool;
private readonly Guid _correlationId;
private readonly IRegisterEvent _eventRegister;
- private IProducerChannel _channel;
private readonly IExecute _executor;
private readonly IStateChanged<ProducerState> _state;
- private readonly IProducerChannelFactory _factory;
- private readonly ISchema<TMessage> _schema;
- private readonly SequenceId _sequenceId;
+ private readonly PulsarClient _pulsarClient;
+ private readonly ProducerOptions<TMessage> _options;
+ private readonly ConcurrentDictionary<int, IProducer<TMessage>> _producers;
+ private readonly IMessageRouter _messageRouter;
+ private readonly CancellationTokenSource _cts = new();
+ private int _producersCount;
private int _isDisposed;
-
public Uri ServiceUrl { get; }
public string Topic { get; }
@@ -45,36 +43,50 @@ namespace DotPulsar.Internal
Guid correlationId,
Uri serviceUrl,
string topic,
- ulong initialSequenceId,
IRegisterEvent registerEvent,
- IProducerChannel initialChannel,
IExecute executor,
IStateChanged<ProducerState> state,
- IProducerChannelFactory factory,
- ISchema<TMessage> schema)
+ ProducerOptions<TMessage> options,
+ PulsarClient pulsarClient
+ )
{
- var messageMetadataPolicy = new DefaultPooledObjectPolicy<PulsarApi.MessageMetadata>();
- _messageMetadataPool = new DefaultObjectPool<PulsarApi.MessageMetadata>(messageMetadataPolicy);
_correlationId = correlationId;
ServiceUrl = serviceUrl;
Topic = topic;
- _sequenceId = new SequenceId(initialSequenceId);
_eventRegister = registerEvent;
- _channel = initialChannel;
_executor = executor;
_state = state;
- _factory = factory;
- _schema = schema;
_isDisposed = 0;
+ _options = options;
+ _pulsarClient = pulsarClient;
+ _messageRouter = options.MessageRouter;
- _eventRegister.Register(new ProducerCreated(_correlationId));
+ _producers = new ConcurrentDictionary<int, IProducer<TMessage>>(1, 31);
}
- public async ValueTask<ProducerState> OnStateChangeTo(ProducerState state, CancellationToken cancellationToken)
- => await _state.StateChangedTo(state, cancellationToken).ConfigureAwait(false);
+ private void CreateSubProducers(int startIndex, int count)
+ {
+ if (count == 0)
+ {
+ var producer = _pulsarClient.NewSubProducer(Topic, _options, _executor, _correlationId);
+ _producers[0] = producer;
+ return;
+ }
- public async ValueTask<ProducerState> OnStateChangeFrom(ProducerState state, CancellationToken cancellationToken)
- => await _state.StateChangedFrom(state, cancellationToken).ConfigureAwait(false);
+ for (var i = startIndex; i < count; ++i)
+ {
+ var producer = _pulsarClient.NewSubProducer(Topic, _options, _executor, _correlationId, (uint) i);
+ _producers[i] = producer;
+ }
+ }
+
+ private async void UpdatePartitions(CancellationToken cancellationToken)
+ {
+ var partitionsCount = (int) await _pulsarClient.GetNumberOfPartitions(Topic, cancellationToken).ConfigureAwait(false);
+ _eventRegister.Register(new UpdatePartitions(_correlationId, (uint)partitionsCount));
+ CreateSubProducers(_producers.Count, partitionsCount);
+ _producersCount = partitionsCount;
+ }
public bool IsFinalState()
=> _state.IsFinalState();
@@ -82,78 +94,46 @@ namespace DotPulsar.Internal
public bool IsFinalState(ProducerState state)
=> _state.IsFinalState(state);
+ public async ValueTask<ProducerState> OnStateChangeTo(ProducerState state, CancellationToken cancellationToken = default)
+ => await _state.StateChangedTo(state, cancellationToken).ConfigureAwait(false);
+
+ public async ValueTask<ProducerState> OnStateChangeFrom(ProducerState state, CancellationToken cancellationToken = default)
+ => await _state.StateChangedFrom(state, cancellationToken).ConfigureAwait(false);
+
public async ValueTask DisposeAsync()
{
if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
return;
- _eventRegister.Register(new ProducerDisposed(_correlationId));
- await _channel.ClosedByClient(CancellationToken.None).ConfigureAwait(false);
- await _channel.DisposeAsync().ConfigureAwait(false);
- }
-
- public async ValueTask<MessageId> Send(TMessage message, CancellationToken cancellationToken)
- => await Send(_schema.Encode(message), cancellationToken).ConfigureAwait(false);
+ _cts.Cancel();
+ _cts.Dispose();
- public async ValueTask<MessageId> Send(MessageMetadata metadata, TMessage message, CancellationToken cancellationToken)
- => await Send(metadata, _schema.Encode(message), cancellationToken).ConfigureAwait(false);
-
- public async ValueTask<MessageId> Send(ReadOnlySequence<byte> data, CancellationToken cancellationToken)
- {
- ThrowIfDisposed();
-
- var metadata = _messageMetadataPool.Get();
- try
- {
- metadata.SequenceId = _sequenceId.FetchNext();
- return await _executor.Execute(() => Send(metadata, data, cancellationToken), cancellationToken).ConfigureAwait(false);
- }
- finally
+ foreach (var producer in _producers.Values)
{
- _messageMetadataPool.Return(metadata);
+ await producer.DisposeAsync().ConfigureAwait(false);
}
- }
-
- public async ValueTask<MessageId> Send(MessageMetadata metadata, ReadOnlySequence<byte> data, CancellationToken cancellationToken)
- {
- ThrowIfDisposed();
- var autoAssignSequenceId = metadata.SequenceId == 0;
- if (autoAssignSequenceId)
- metadata.SequenceId = _sequenceId.FetchNext();
-
- try
- {
- return await _executor.Execute(() => Send(metadata.Metadata, data, cancellationToken), cancellationToken).ConfigureAwait(false);
- }
- finally
- {
- if (autoAssignSequenceId)
- metadata.SequenceId = 0;
- }
- }
-
- private async ValueTask<MessageId> Send(PulsarApi.MessageMetadata metadata, ReadOnlySequence<byte> data, CancellationToken cancellationToken)
- {
- var response = await _channel.Send(metadata, data, cancellationToken).ConfigureAwait(false);
- return response.MessageId.ToMessageId();
+ _eventRegister.Register(new ProducerDisposed(_correlationId));
}
public async Task EstablishNewChannel(CancellationToken cancellationToken)
{
- var channel = await _executor.Execute(() => _factory.Create(cancellationToken), cancellationToken).ConfigureAwait(false);
-
- var oldChannel = _channel;
- _channel = channel;
-
- if (oldChannel is not null)
- await oldChannel.DisposeAsync().ConfigureAwait(false);
+ await _executor.Execute(() => UpdatePartitions(cancellationToken), cancellationToken).ConfigureAwait(false);
}
- private void ThrowIfDisposed()
+ private int ChoosePartitions(MessageMetadata? metadata)
{
- if (_isDisposed != 0)
- throw new ProducerDisposedException(GetType().FullName!);
+ if (_producers.IsEmpty)
+ {
+ throw new LookupNotReadyException();
+ }
+ return _producersCount == 0 ? 0 : _messageRouter.ChoosePartition(metadata, _producersCount);
}
+
+ public async ValueTask<MessageId> Send(TMessage message, CancellationToken cancellationToken = default)
+ => await _executor.Execute(() => _producers[ChoosePartitions(null)].Send(message, cancellationToken), cancellationToken).ConfigureAwait(false);
+
+ public async ValueTask<MessageId> Send(MessageMetadata metadata, TMessage message, CancellationToken cancellationToken = default)
+ => await _executor.Execute(() => _producers[ChoosePartitions(metadata)].Send(message, cancellationToken), cancellationToken).ConfigureAwait(false);
}
}
diff --git a/src/DotPulsar/Internal/ProducerBuilder.cs b/src/DotPulsar/Internal/ProducerBuilder.cs
index 420b2a2..95a7bdc 100644
--- a/src/DotPulsar/Internal/ProducerBuilder.cs
+++ b/src/DotPulsar/Internal/ProducerBuilder.cs
@@ -26,6 +26,7 @@ namespace DotPulsar.Internal
private ulong _initialSequenceId;
private string? _topic;
private IHandleStateChanged<ProducerStateChanged>? _stateChangedHandler;
+ private IMessageRouter? _messageRouter;
public ProducerBuilder(IPulsarClient pulsarClient, ISchema<TMessage> schema)
{
@@ -65,6 +66,12 @@ namespace DotPulsar.Internal
return this;
}
+ public IProducerBuilder<TMessage> MessageRouter(IMessageRouter messageRouter)
+ {
+ _messageRouter = messageRouter;
+ return this;
+ }
+
public IProducer<TMessage> Create()
{
if (string.IsNullOrEmpty(_topic))
@@ -78,6 +85,8 @@ namespace DotPulsar.Internal
StateChangedHandler = _stateChangedHandler
};
+ if (_messageRouter != null) options.MessageRouter = _messageRouter;
+
return _pulsarClient.CreateProducer<TMessage>(options);
}
}
diff --git a/src/DotPulsar/Internal/ProducerProcess.cs b/src/DotPulsar/Internal/ProducerProcess.cs
index 7e0b37f..e7230fe 100644
--- a/src/DotPulsar/Internal/ProducerProcess.cs
+++ b/src/DotPulsar/Internal/ProducerProcess.cs
@@ -15,7 +15,9 @@
namespace DotPulsar.Internal
{
using Abstractions;
+ using Events;
using System;
+ using System.Threading;
using System.Threading.Tasks;
public sealed class ProducerProcess : Process
@@ -23,30 +25,103 @@ namespace DotPulsar.Internal
private readonly IStateManager<ProducerState> _stateManager;
private readonly IEstablishNewChannel _producer;
+ // The following variables are only used when this is the process for parent producer.
+ private readonly IRegisterEvent _processManager;
+ private int _partitionsCount;
+ private int _connectedProducersCount;
+ private int _initialProducersCount;
+
+ // The following variables are only used for sub producer
+ private readonly Guid? _partitionedProducerId;
+
public ProducerProcess(
Guid correlationId,
IStateManager<ProducerState> stateManager,
- IEstablishNewChannel producer) : base(correlationId)
+ IEstablishNewChannel producer,
+ IRegisterEvent processManager,
+ Guid? partitionedProducerId = null) : base(correlationId)
{
_stateManager = stateManager;
_producer = producer;
+ _processManager = processManager;
+ _partitionedProducerId = partitionedProducerId;
}
public override async ValueTask DisposeAsync()
{
- _stateManager.SetState(ProducerState.Closed);
+ SetState(ProducerState.Closed);
CancellationTokenSource.Cancel();
+
await _producer.DisposeAsync().ConfigureAwait(false);
}
+ protected override void HandleExtend(IEvent e)
+ {
+ switch (e)
+ {
+ case PartitionedSubProducerStateChanged stateChanged:
+ switch (stateChanged.ProducerState)
+ {
+ case ProducerState.Closed:
+ _stateManager.SetState(ProducerState.Closed);
+ break;
+ case ProducerState.Connected:
+ Interlocked.Increment(ref _connectedProducersCount);
+ break;
+ case ProducerState.Disconnected:
+ // When the sub producer is initialized, the Disconnected event will be triggered.
+ // So we need to first subtract from _initialProducersCount.
+ if (_initialProducersCount == 0)
+ Interlocked.Decrement(ref _connectedProducersCount);
+ else
+ Interlocked.Decrement(ref _initialProducersCount);
+ break;
+ case ProducerState.Faulted:
+ _stateManager.SetState(ProducerState.Faulted);
+ break;
+ case ProducerState.PartiallyConnected: break;
+ default: throw new ArgumentOutOfRangeException();
+ }
+
+ break;
+ case UpdatePartitions updatePartitions:
+ if (updatePartitions.PartitionsCount == 0)
+ {
+ Interlocked.Exchange(ref _initialProducersCount, 1);
+ Interlocked.Exchange(ref _partitionsCount, 1);
+ }
+ else
+ {
+ Interlocked.Add(ref _initialProducersCount, (int) updatePartitions.PartitionsCount - _partitionsCount);
+ Interlocked.Exchange(ref _partitionsCount, (int) updatePartitions.PartitionsCount);
+ }
+
+ break;
+ }
+ }
+
protected override void CalculateState()
{
if (_stateManager.IsFinalState())
return;
+ if (!IsSubProducer()) // parent producer process
+ {
+ if (_connectedProducersCount <= 0)
+ _stateManager.SetState(ProducerState.Disconnected);
+ else if (_connectedProducersCount == _partitionsCount)
+ _stateManager.SetState(ProducerState.Connected);
+ else
+ _stateManager.SetState(ProducerState.PartiallyConnected);
+
+ if (_partitionsCount == 0)
+ _ = _producer.EstablishNewChannel(CancellationTokenSource.Token);
+ return;
+ }
+
if (ExecutorState == ExecutorState.Faulted)
{
- _stateManager.SetState(ProducerState.Faulted);
+ SetState(ProducerState.Faulted);
return;
}
@@ -54,13 +129,27 @@ namespace DotPulsar.Internal
{
case ChannelState.ClosedByServer:
case ChannelState.Disconnected:
- _stateManager.SetState(ProducerState.Disconnected);
+ SetState(ProducerState.Disconnected);
_ = _producer.EstablishNewChannel(CancellationTokenSource.Token);
return;
case ChannelState.Connected:
- _stateManager.SetState(ProducerState.Connected);
+ SetState(ProducerState.Connected);
return;
}
}
+
+ /// <summary>
+ /// Check if this is the sub producer process of the partitioned producer.
+ /// </summary>
+ private bool IsSubProducer()
+ => _partitionedProducerId.HasValue;
+
+ private void SetState(ProducerState state)
+ {
+ _stateManager.SetState(state);
+
+ if (IsSubProducer())
+ _processManager.Register(new PartitionedSubProducerStateChanged(_partitionedProducerId!.Value, state));
+ }
}
}
diff --git a/src/DotPulsar/Internal/PulsarClientFactory.cs b/src/DotPulsar/Internal/PulsarClientFactory.cs
new file mode 100644
index 0000000..fc58aa9
--- /dev/null
+++ b/src/DotPulsar/Internal/PulsarClientFactory.cs
@@ -0,0 +1,28 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal
+{
+ using Abstractions;
+ using DotPulsar.Abstractions;
+ using System;
+
+ public sealed class PulsarClientFactory
+ {
+ public static PulsarClient CreatePulsarClient(IConnectionPool connectionPool, ProcessManager processManager, IHandleException exceptionHandler, Uri serviceUrl)
+ {
+ return new PulsarClient(connectionPool, processManager, exceptionHandler, serviceUrl);
+ }
+ }
+}
diff --git a/src/DotPulsar/Internal/RequestResponseHandler.cs b/src/DotPulsar/Internal/RequestResponseHandler.cs
index 9680d4c..011882d 100644
--- a/src/DotPulsar/Internal/RequestResponseHandler.cs
+++ b/src/DotPulsar/Internal/RequestResponseHandler.cs
@@ -39,6 +39,7 @@ namespace DotPulsar.Internal
_getResponseIdentifier.Set(BaseCommand.Type.CloseConsumer, cmd => StandardRequest.WithConsumerId(cmd.CloseConsumer.RequestId, cmd.CloseConsumer.ConsumerId));
_getResponseIdentifier.Set(BaseCommand.Type.CloseProducer, cmd => StandardRequest.WithProducerId(cmd.CloseProducer.RequestId, cmd.CloseProducer.ProducerId));
_getResponseIdentifier.Set(BaseCommand.Type.LookupResponse, cmd => StandardRequest.WithRequestId(cmd.LookupTopicResponse.RequestId));
+ _getResponseIdentifier.Set(BaseCommand.Type.PartitionedMetadataResponse, cmd => StandardRequest.WithRequestId(cmd.PartitionMetadataResponse.RequestId));
_getResponseIdentifier.Set(BaseCommand.Type.GetLastMessageIdResponse, cmd => StandardRequest.WithRequestId(cmd.GetLastMessageIdResponse.RequestId));
_getResponseIdentifier.Set(BaseCommand.Type.GetOrCreateSchemaResponse, cmd => StandardRequest.WithRequestId(cmd.GetOrCreateSchemaResponse.RequestId));
_getResponseIdentifier.Set(BaseCommand.Type.Success, cmd => StandardRequest.WithRequestId(cmd.Success.RequestId));
@@ -105,6 +106,12 @@ namespace DotPulsar.Internal
return _requests.CreateTask(StandardRequest.WithRequestId(command.RequestId));
}
+ public Task<BaseCommand> Outgoing(CommandPartitionedTopicMetadata command)
+ {
+ command.RequestId = _requestId.FetchNext();
+ return _requests.CreateTask(StandardRequest.WithRequestId(command.RequestId));
+ }
+
public Task<BaseCommand> Outgoing(CommandSeek command)
{
command.RequestId = _requestId.FetchNext();
diff --git a/src/DotPulsar/Internal/Producer.cs b/src/DotPulsar/Internal/SubProducer.cs
similarity index 98%
copy from src/DotPulsar/Internal/Producer.cs
copy to src/DotPulsar/Internal/SubProducer.cs
index b130318..5d15b0e 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/SubProducer.cs
@@ -25,7 +25,7 @@ namespace DotPulsar.Internal
using System.Threading;
using System.Threading.Tasks;
- public sealed class Producer<TMessage> : IEstablishNewChannel, IProducer<TMessage>
+ public sealed class SubProducer<TMessage> : IEstablishNewChannel, IProducer<TMessage>
{
private readonly ObjectPool<PulsarApi.MessageMetadata> _messageMetadataPool;
private readonly Guid _correlationId;
@@ -41,7 +41,7 @@ namespace DotPulsar.Internal
public Uri ServiceUrl { get; }
public string Topic { get; }
- public Producer(
+ public SubProducer(
Guid correlationId,
Uri serviceUrl,
string topic,
@@ -91,7 +91,6 @@ namespace DotPulsar.Internal
await _channel.ClosedByClient(CancellationToken.None).ConfigureAwait(false);
await _channel.DisposeAsync().ConfigureAwait(false);
}
-
public async ValueTask<MessageId> Send(TMessage message, CancellationToken cancellationToken)
=> await Send(_schema.Encode(message), cancellationToken).ConfigureAwait(false);
diff --git a/src/DotPulsar/ProducerOptions.cs b/src/DotPulsar/ProducerOptions.cs
index 958a1f8..3e8cd0e 100644
--- a/src/DotPulsar/ProducerOptions.cs
+++ b/src/DotPulsar/ProducerOptions.cs
@@ -40,6 +40,7 @@ namespace DotPulsar
InitialSequenceId = DefaultInitialSequenceId;
Topic = topic;
Schema = schema;
+ MessageRouter = new RoundRobinPartitionRouter();
}
/// <summary>
@@ -71,5 +72,10 @@ namespace DotPulsar
/// Set the topic for this producer. This is required.
/// </summary>
public string Topic { get; set; }
+
+ /// <summary>
+ /// Set the message router. The default router is Round Robin partition router.
+ /// </summary>
+ public IMessageRouter MessageRouter { get; set; }
}
}
diff --git a/src/DotPulsar/ProducerState.cs b/src/DotPulsar/ProducerState.cs
index 5bab409..da216c0 100644
--- a/src/DotPulsar/ProducerState.cs
+++ b/src/DotPulsar/ProducerState.cs
@@ -37,6 +37,11 @@ namespace DotPulsar
/// <summary>
/// The producer is faulted. This is a final state.
/// </summary>
- Faulted
+ Faulted,
+
+ /// <summary>
+ /// Some of the sub-producers are disconnected.
+ /// </summary>
+ PartiallyConnected
}
}
diff --git a/src/DotPulsar/PulsarClient.cs b/src/DotPulsar/PulsarClient.cs
index 678ac83..5415485 100644
--- a/src/DotPulsar/PulsarClient.cs
+++ b/src/DotPulsar/PulsarClient.cs
@@ -20,6 +20,7 @@ namespace DotPulsar
using Exceptions;
using Internal;
using Internal.Abstractions;
+ using Internal.Extensions;
using System;
using System.Linq;
using System.Threading;
@@ -57,6 +58,20 @@ namespace DotPulsar
public static IPulsarClientBuilder Builder()
=> new PulsarClientBuilder();
+ internal async Task<uint> GetNumberOfPartitions(string topic, CancellationToken cancellationToken)
+ {
+ var connection = await _connectionPool.FindConnectionForTopic(topic, cancellationToken).ConfigureAwait(false);
+ var commandPartitionedMetadata = new CommandPartitionedTopicMetadata() { Topic = topic };
+ var response = await connection.Send(commandPartitionedMetadata, cancellationToken).ConfigureAwait(false);
+
+ response.Expect(BaseCommand.Type.PartitionedMetadataResponse);
+
+ if (response.PartitionMetadataResponse.Response == CommandPartitionedTopicMetadataResponse.LookupType.Failed)
+ response.PartitionMetadataResponse.Throw();
+
+ return response.PartitionMetadataResponse.Partitions;
+ }
+
/// <summary>
/// Create a producer.
/// </summary>
@@ -64,19 +79,47 @@ namespace DotPulsar
{
ThrowIfDisposed();
+ var correlationId = Guid.NewGuid();
+ var executor = new Executor(correlationId, _processManager, _exceptionHandler);
+ var stateManager = new StateManager<ProducerState>(ProducerState.Disconnected, ProducerState.Closed, ProducerState.Faulted);
+
+ var producer = new Producer<TMessage>(correlationId, ServiceUrl, options.Topic, _processManager, executor, stateManager, options, this);
+
+ if (options.StateChangedHandler is not null)
+ _ = StateMonitor.MonitorProducer(producer, options.StateChangedHandler);
+ var process = new ProducerProcess(correlationId, stateManager, producer, _processManager);
+ _processManager.Add(process);
+ process.Start();
+ return producer;
+ }
+
+ /// <summary>
+ /// Create a producer internally.
+ /// This method is used to create internal producers for partitioned producer.
+ /// </summary>
+ internal SubProducer<TMessage> NewSubProducer<TMessage>(string topic, ProducerOptions<TMessage> options, IExecute executor, Guid partitionedProducerGuid,
+ uint? partitionIndex = null)
+ {
+ ThrowIfDisposed();
+
ICompressorFactory? compressorFactory = null;
+ if (partitionIndex.HasValue)
+ {
+ topic = $"{topic}-partition-{partitionIndex}";
+ }
+
if (options.CompressionType != CompressionType.None)
{
var compressionType = (Internal.PulsarApi.CompressionType) options.CompressionType;
compressorFactory = CompressionFactories.CompressorFactories().SingleOrDefault(f => f.CompressionType == compressionType);
+
if (compressorFactory is null)
throw new CompressionException($"Support for {compressionType} compression was not found");
}
var correlationId = Guid.NewGuid();
- var executor = new Executor(correlationId, _processManager, _exceptionHandler);
- var topic = options.Topic;
+
var producerName = options.ProducerName;
var schema = options.Schema;
var initialSequenceId = options.InitialSequenceId;
@@ -84,10 +127,11 @@ namespace DotPulsar
var factory = new ProducerChannelFactory(correlationId, _processManager, _connectionPool, topic, producerName, schema.SchemaInfo, compressorFactory);
var stateManager = new StateManager<ProducerState>(ProducerState.Disconnected, ProducerState.Closed, ProducerState.Faulted);
var initialChannel = new NotReadyChannel<TMessage>();
- var producer = new Producer<TMessage>(correlationId, ServiceUrl, topic, initialSequenceId, _processManager, initialChannel, executor, stateManager, factory, schema);
- if (options.StateChangedHandler is not null)
+ var producer = new SubProducer<TMessage>(correlationId, ServiceUrl, topic, initialSequenceId, _processManager, initialChannel, executor, stateManager, factory, schema);
+
+ if (options.StateChangedHandler is not null && !partitionIndex.HasValue) // the StateChangeHandler of the sub producers in partitioned producers should be disabled.
_ = StateMonitor.MonitorProducer(producer, options.StateChangedHandler);
- var process = new ProducerProcess(correlationId, stateManager, producer);
+ var process = new ProducerProcess(correlationId, stateManager, producer, _processManager, partitionedProducerGuid);
_processManager.Add(process);
process.Start();
return producer;
diff --git a/src/DotPulsar/RoundRobinPartitionRouter.cs b/src/DotPulsar/RoundRobinPartitionRouter.cs
new file mode 100644
index 0000000..05595d6
--- /dev/null
+++ b/src/DotPulsar/RoundRobinPartitionRouter.cs
@@ -0,0 +1,46 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar
+{
+ using Abstractions;
+ using HashDepot;
+ using System.Text;
+ using System.Threading;
+
+ /// <summary>
+ /// Round robin partition messages router
+ /// If no key is provided, the producer will publish messages across all partitions in round-robin fashion
+ /// to achieve maximum throughput. While if a key is specified on the message, the partitioned producer will
+ /// hash the key and assign message to a particular partition.
+ /// This is the default mode.
+ /// </summary>
+ public sealed class RoundRobinPartitionRouter : IMessageRouter
+ {
+ private int _partitionIndex = -1;
+
+ /// <summary>
+ /// Choose a partition in round robin routig mode
+ /// </summary>
+ public int ChoosePartition(MessageMetadata? messageMetadata, int partitionsCount)
+ {
+ if (messageMetadata != null && !string.IsNullOrEmpty(messageMetadata.Key))
+ {
+ return (int) MurmurHash3.Hash32(Encoding.UTF8.GetBytes(messageMetadata.Key ?? string.Empty), 0) % partitionsCount;
+ }
+
+ return Interlocked.Increment(ref _partitionIndex) % partitionsCount;
+ }
+ }
+}
diff --git a/src/DotPulsar/SinglePartitionRouter.cs b/src/DotPulsar/SinglePartitionRouter.cs
new file mode 100644
index 0000000..3a2b563
--- /dev/null
+++ b/src/DotPulsar/SinglePartitionRouter.cs
@@ -0,0 +1,51 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar
+{
+ using Abstractions;
+ using HashDepot;
+ using System;
+ using System.Text;
+
+ /// <summary>
+ /// If no key is provided, the producer will randomly pick one single partition and publish all the messages
+ /// into that partition. While if a key is specified on the message, the partitioned producer will hash the
+ /// key and assign message to a particular partition.
+ /// </summary>
+ public sealed class SinglePartitionRouter : IMessageRouter
+ {
+ private int? _partitionIndex;
+
+ internal SinglePartitionRouter(int? partitionIndex = null)
+ {
+ _partitionIndex = partitionIndex;
+ }
+
+ /// <summary>
+ /// Choose a partition in single partition routing mode
+ /// </summary>
+ public int ChoosePartition(MessageMetadata? messageMetadata, int partitionsCount)
+ {
+ if (messageMetadata != null && !string.IsNullOrEmpty(messageMetadata.Key))
+ {
+ return (int) MurmurHash3.Hash32(Encoding.UTF8.GetBytes(messageMetadata.Key ?? string.Empty), 0) % partitionsCount;
+ }
+
+ _partitionIndex ??= new Random().Next(0, partitionsCount);
+
+ return _partitionIndex.Value;
+ }
+ }
+}
diff --git a/tests/DotPulsar.Tests/DotPulsar.Tests.csproj b/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
index ca0bf24..f782fc3 100644
--- a/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
+++ b/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
@@ -7,6 +7,7 @@
<ItemGroup>
<PackageReference Include="FluentAssertions" Version="5.10.3" />
+ <PackageReference Include="NSubstitute" Version="4.2.2" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.10.0" />
<PackageReference Include="xunit" Version="2.4.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.3">
diff --git a/tests/DotPulsar.Tests/Internal/PartitionedProducerProcessTests.cs b/tests/DotPulsar.Tests/Internal/PartitionedProducerProcessTests.cs
new file mode 100644
index 0000000..255b87a
--- /dev/null
+++ b/tests/DotPulsar.Tests/Internal/PartitionedProducerProcessTests.cs
@@ -0,0 +1,111 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Tests.Internal
+{
+ using Abstractions;
+ using DotPulsar.Internal;
+ using DotPulsar.Internal.Abstractions;
+ using DotPulsar.Internal.Events;
+ using NSubstitute;
+ using System;
+ using System.Collections.Concurrent;
+ using System.Collections.Generic;
+ using Xunit;
+
+ public class PartitionedProducerProcessTests
+ {
+ [Fact]
+ public void TestPartitionedProducerStateManage_WhenSubProducersStateChange_ThenPartitionedProducerStateChangeCorrectly()
+ {
+ var connectionPool = Substitute.For<IConnectionPool>();
+ var establishNewChannel = Substitute.For<IEstablishNewChannel>();
+ var producer = Substitute.For<IProducer>();
+
+ var processManager = new ProcessManager(connectionPool);
+
+ var producerGuids = new Dictionary<uint, Guid>(3);
+ var producersGroup = new ConcurrentDictionary<uint, IProducer>(Environment.ProcessorCount, 3);
+ var partitionedProducerGuid = Guid.NewGuid();
+
+ for (uint i = 0; i < 3; i++)
+ {
+ var stateManager = new StateManager<ProducerState>(ProducerState.Disconnected, ProducerState.Closed, ProducerState.Faulted);
+ var correlationId = Guid.NewGuid();
+ var process = new ProducerProcess(correlationId, stateManager, establishNewChannel, processManager, partitionedProducerGuid);
+ producerGuids[i] = correlationId;
+ producersGroup[i] = producer;
+ processManager.Add(process);
+ }
+
+ var partitionedStateManager =
+ new StateManager<ProducerState>(ProducerState.Disconnected, ProducerState.Closed, ProducerState.Faulted);
+
+ var producerProcess = new ProducerProcess(partitionedProducerGuid, partitionedStateManager, establishNewChannel, new ProcessManager(connectionPool));
+ processManager.Add(producerProcess);
+ processManager.Register(new UpdatePartitions(partitionedProducerGuid, (uint) producersGroup.Count));
+
+ // Test initial channel
+ processManager.Register(new ChannelDisconnected(producerGuids[0]));
+ Assert.Equal(ProducerState.Disconnected, partitionedStateManager.CurrentState);
+ processManager.Register(new ChannelDisconnected(producerGuids[1]));
+ Assert.Equal(ProducerState.Disconnected, partitionedStateManager.CurrentState);
+ processManager.Register(new ChannelDisconnected(producerGuids[2]));
+ Assert.Equal(ProducerState.Disconnected, partitionedStateManager.CurrentState);
+
+ // Test connect
+ Assert.Equal(ProducerState.Disconnected, partitionedStateManager.CurrentState);
+ processManager.Register(new ChannelConnected(producerGuids[0]));
+ Assert.Equal(ProducerState.PartiallyConnected, partitionedStateManager.CurrentState);
+ processManager.Register(new ChannelConnected(producerGuids[1]));
+ Assert.Equal(ProducerState.PartiallyConnected, partitionedStateManager.CurrentState);
+ processManager.Register(new ChannelConnected(producerGuids[2]));
+ Assert.Equal(ProducerState.Connected, partitionedStateManager.CurrentState);
+
+ // Test disconnect
+ processManager.Register(new ChannelDisconnected(producerGuids[1]));
+ Assert.Equal(ProducerState.PartiallyConnected, partitionedStateManager.CurrentState);
+
+ // Test reconnect
+ processManager.Register(new ChannelConnected(producerGuids[1]));
+ Assert.Equal(ProducerState.Connected, partitionedStateManager.CurrentState);
+
+ // Test fault
+ processManager.Register(new ExecutorFaulted(producerGuids[1]));
+ Assert.Equal(ProducerState.Faulted, partitionedStateManager.CurrentState);
+ }
+
+ [Fact]
+ public void TestUpdatePartitions_WhenIncreasePartitions_ThenPartitionedProducerStateChangeCorrectly()
+ {
+ var connectionPool = Substitute.For<IConnectionPool>();
+ var processManager = new ProcessManager(connectionPool);
+ var establishNewChannel = Substitute.For<IEstablishNewChannel>();
+
+ var guid = Guid.NewGuid();
+ var stateManager = new StateManager<ProducerState>(ProducerState.Disconnected, ProducerState.Closed, ProducerState.Faulted);
+ var process = new ProducerProcess(guid, stateManager, establishNewChannel, new ProcessManager(connectionPool));
+ processManager.Add(process);
+ processManager.Register(new UpdatePartitions(guid, 1));
+
+ Assert.Equal(ProducerState.Disconnected, stateManager.CurrentState);
+ processManager.Register(new PartitionedSubProducerStateChanged(guid, ProducerState.Connected));
+ Assert.Equal(ProducerState.Connected, stateManager.CurrentState);
+ processManager.Register(new UpdatePartitions(guid, 2));
+ Assert.Equal(ProducerState.PartiallyConnected, stateManager.CurrentState);
+ processManager.Register(new PartitionedSubProducerStateChanged(guid, ProducerState.Connected));
+ Assert.Equal(ProducerState.Connected, stateManager.CurrentState);
+ }
+ }
+}
diff --git a/tests/DotPulsar.Tests/PulsarClientTests.cs b/tests/DotPulsar.Tests/PulsarClientTests.cs
new file mode 100644
index 0000000..67370d8
--- /dev/null
+++ b/tests/DotPulsar.Tests/PulsarClientTests.cs
@@ -0,0 +1,75 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Tests
+{
+ using Abstractions;
+ using DotPulsar.Internal;
+ using DotPulsar.Internal.Abstractions;
+ using DotPulsar.Internal.PulsarApi;
+ using Extensions;
+ using NSubstitute;
+ using System;
+ using System.Threading;
+ using System.Threading.Tasks;
+ using Xunit;
+ using Schema = Schema;
+
+ public class PulsarClientTests
+ {
+ [Fact]
+ public async Task GetPartitionedProducer_GivenPartitionedTopic_ShouldReturnPartitionProducer()
+ {
+ //Arrange
+ var topicName = "persistent://public/default/test-topic";
+ uint expectedPartitions = 3;
+
+ var connection = Substitute.For<IConnection>();
+
+ // use saveGetPartitions to assert CommandPartitionedTopicMetadata.
+ CommandPartitionedTopicMetadata? saveGetPartitions = null;
+
+ connection.Send(Arg.Any<CommandPartitionedTopicMetadata>(), Arg.Any<CancellationToken>())
+ .Returns(new BaseCommand()
+ {
+ CommandType = BaseCommand.Type.PartitionedMetadataResponse,
+ PartitionMetadataResponse = new CommandPartitionedTopicMetadataResponse()
+ {
+ Response = CommandPartitionedTopicMetadataResponse.LookupType.Success, Partitions = expectedPartitions
+ }
+ })
+ .AndDoes(info =>
+ {
+ saveGetPartitions = (CommandPartitionedTopicMetadata) info[0];
+ });
+
+ var connectionPool = Substitute.For<IConnectionPool>();
+
+ connectionPool.FindConnectionForTopic(Arg.Any<string>(), Arg.Any<CancellationToken>())
+ .Returns(connection);
+
+ var client = PulsarClientFactory.CreatePulsarClient(connectionPool, new ProcessManager(connectionPool), Substitute.For<IHandleException>(), new Uri
+ ("pusarl://localhost:6650/"));
+
+ //Act
+ await using var producer = client.NewProducer(Schema.String).Topic(topicName).Create();
+ await ((IEstablishNewChannel) producer).EstablishNewChannel(new CancellationTokenSource(TimeSpan.FromSeconds(10)).Token);
+
+ //Assert
+ Assert.NotNull(saveGetPartitions);
+ Assert.Equal(saveGetPartitions?.Topic, topicName);
+ Assert.IsType<Producer<string>>(producer);
+ }
+ }
+}