You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bl...@apache.org on 2021/06/21 19:33:53 UTC

[pulsar-dotpulsar] branch master updated: Basic partitioned producer (#71)

This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 85c2f70  Basic partitioned producer (#71)
85c2f70 is described below

commit 85c2f70b545e7398dbfd3707e5ffc7831303b47e
Author: Zike Yang <ar...@armail.top>
AuthorDate: Tue Jun 22 03:33:46 2021 +0800

    Basic partitioned producer (#71)
    
    * Add GetPartitions.
    
    * Add PartitionedProducerProcess.
    
    * Add PartitionedProducerProcess Test.
    
    * Remove PartitionedProducerState.
    
    * Add UpdatePartitions Event.
    
    * Implement PartitionedProducer.
    
    * Refactor CreateProducer
    
    * Fix sub topic format error
    
    * Fix some problems
    
    * Remove PartitionedProducerProcess.
    
    * Make some classes sealed.
    
    * Use await in PartitionedProducer.Send
    
    * Make PulsarClient internal and fix tests.
    
    * Make GetNumberOfPartitions private.
    
    * Fix test method name.
    
    * Use NSubstitute instead of Moq.
    
    * Make CreateSubProducers sync.
    
    * Fix some comments.
    
    * Fix warnings
    
    * Subproducers
    
    * Update
    
    * Change PartitionedProducer to Producer and add some comments.
    
    * Add UpdatePartitions to connect logic and fix some other problems.
    
    * Update Partitions when start process.
    
    * Fix test warnings.
---
 samples/Producing/Program.cs                       |   1 +
 .../IMessageRouter.cs}                             |  25 +---
 src/DotPulsar/DotPulsar.csproj                     |   3 +-
 src/DotPulsar/Internal/Abstractions/IConnection.cs |   1 +
 src/DotPulsar/Internal/Abstractions/Process.cs     |   6 +
 src/DotPulsar/Internal/ChannelManager.cs           |   3 +
 src/DotPulsar/Internal/Connection.cs               |  16 +++
 .../Events/PartitionedSubProducerStateChanged.cs}  |  34 ++---
 .../Events/UpdatePartitions.cs}                    |  33 ++---
 .../Internal/Exceptions/LookupNotReadyException.cs |  23 ++++
 .../Internal/Extensions/CommandExtensions.cs       |   9 ++
 src/DotPulsar/Internal/Producer.cs                 | 142 +++++++++------------
 src/DotPulsar/Internal/ProducerBuilder.cs          |   9 ++
 src/DotPulsar/Internal/ProducerProcess.cs          |  99 +++++++++++++-
 src/DotPulsar/Internal/PulsarClientFactory.cs      |  28 ++++
 src/DotPulsar/Internal/RequestResponseHandler.cs   |   7 +
 .../Internal/{Producer.cs => SubProducer.cs}       |   5 +-
 src/DotPulsar/ProducerOptions.cs                   |   6 +
 src/DotPulsar/ProducerState.cs                     |   7 +-
 src/DotPulsar/PulsarClient.cs                      |  54 +++++++-
 src/DotPulsar/RoundRobinPartitionRouter.cs         |  46 +++++++
 src/DotPulsar/SinglePartitionRouter.cs             |  51 ++++++++
 tests/DotPulsar.Tests/DotPulsar.Tests.csproj       |   1 +
 .../Internal/PartitionedProducerProcessTests.cs    | 111 ++++++++++++++++
 tests/DotPulsar.Tests/PulsarClientTests.cs         |  75 +++++++++++
 25 files changed, 638 insertions(+), 157 deletions(-)

diff --git a/samples/Producing/Program.cs b/samples/Producing/Program.cs
index 5ea9a51..9c2ef0e 100644
--- a/samples/Producing/Program.cs
+++ b/samples/Producing/Program.cs
@@ -71,6 +71,7 @@ namespace Producing
             {
                 ProducerState.Connected => "is connected",
                 ProducerState.Disconnected => "is disconnected",
+                ProducerState.PartiallyConnected => "has partially connected",
                 ProducerState.Closed => "has closed",
                 ProducerState.Faulted => "has faulted",
                 _ => $"has an unknown state '{stateChanged.ProducerState}'"
diff --git a/src/DotPulsar/ProducerState.cs b/src/DotPulsar/Abstractions/IMessageRouter.cs
similarity index 56%
copy from src/DotPulsar/ProducerState.cs
copy to src/DotPulsar/Abstractions/IMessageRouter.cs
index 5bab409..6652c9c 100644
--- a/src/DotPulsar/ProducerState.cs
+++ b/src/DotPulsar/Abstractions/IMessageRouter.cs
@@ -12,31 +12,16 @@
  * limitations under the License.
  */
 
-namespace DotPulsar
+namespace DotPulsar.Abstractions
 {
     /// <summary>
-    /// The possible states a producer can be in.
+    /// A message routing abstraction
     /// </summary>
-    public enum ProducerState : byte
+    public interface IMessageRouter
     {
         /// <summary>
-        /// The producer is closed. This is a final state.
+        /// Choose a partition.
         /// </summary>
-        Closed,
-
-        /// <summary>
-        /// The producer is connected.
-        /// </summary>
-        Connected,
-
-        /// <summary>
-        /// The producer is disconnected.
-        /// </summary>
-        Disconnected,
-
-        /// <summary>
-        /// The producer is faulted. This is a final state.
-        /// </summary>
-        Faulted
+        int ChoosePartition(MessageMetadata? messageMetadata, int partitionsCount);
     }
 }
diff --git a/src/DotPulsar/DotPulsar.csproj b/src/DotPulsar/DotPulsar.csproj
index 5538b9b..9cce4c3 100644
--- a/src/DotPulsar/DotPulsar.csproj
+++ b/src/DotPulsar/DotPulsar.csproj
@@ -22,7 +22,8 @@
   </PropertyGroup>
 
   <ItemGroup>    
-    <PackageReference Include="Microsoft.Extensions.ObjectPool" Version="5.0.6" />    
+    <PackageReference Include="HashDepot" Version="2.0.3" />
+    <PackageReference Include="Microsoft.Extensions.ObjectPool" Version="5.0.6" />
     <PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0" PrivateAssets="All" />
     <PackageReference Include="protobuf-net" Version="3.0.101" />
     <PackageReference Include="System.IO.Pipelines" Version="5.0.1" />
diff --git a/src/DotPulsar/Internal/Abstractions/IConnection.cs b/src/DotPulsar/Internal/Abstractions/IConnection.cs
index 368a111..f6c7092 100644
--- a/src/DotPulsar/Internal/Abstractions/IConnection.cs
+++ b/src/DotPulsar/Internal/Abstractions/IConnection.cs
@@ -41,5 +41,6 @@ namespace DotPulsar.Internal.Abstractions
         Task<BaseCommand> Send(CommandCloseConsumer command, CancellationToken cancellationToken);
         Task<BaseCommand> Send(SendPackage command, CancellationToken cancellationToken);
         Task<BaseCommand> Send(CommandGetOrCreateSchema command, CancellationToken cancellationToken);
+        Task<BaseCommand> Send(CommandPartitionedTopicMetadata command, CancellationToken cancellationToken);
     }
 }
diff --git a/src/DotPulsar/Internal/Abstractions/Process.cs b/src/DotPulsar/Internal/Abstractions/Process.cs
index 056471a..4c0d3d8 100644
--- a/src/DotPulsar/Internal/Abstractions/Process.cs
+++ b/src/DotPulsar/Internal/Abstractions/Process.cs
@@ -68,11 +68,17 @@ namespace DotPulsar.Internal.Abstractions
                 case ChannelUnsubscribed _:
                     ChannelState = ChannelState.Unsubscribed;
                     break;
+                default:
+                    HandleExtend(e);
+                    break;
             }
 
             CalculateState();
         }
 
         protected abstract void CalculateState();
+
+        protected virtual void HandleExtend(IEvent e)
+            => throw new NotImplementedException();
     }
 }
diff --git a/src/DotPulsar/Internal/ChannelManager.cs b/src/DotPulsar/Internal/ChannelManager.cs
index ed3fa51..8f61858 100644
--- a/src/DotPulsar/Internal/ChannelManager.cs
+++ b/src/DotPulsar/Internal/ChannelManager.cs
@@ -160,6 +160,9 @@ namespace DotPulsar.Internal
         public Task<BaseCommand> Outgoing(CommandLookupTopic command)
             => _requestResponseHandler.Outgoing(command);
 
+        public Task<BaseCommand> Outgoing(CommandPartitionedTopicMetadata command)
+            => _requestResponseHandler.Outgoing(command);
+
         public Task<BaseCommand> Outgoing(CommandSeek command)
         {
             using (TakeConsumerSenderLock(command.ConsumerId))
diff --git a/src/DotPulsar/Internal/Connection.cs b/src/DotPulsar/Internal/Connection.cs
index efc1b12..b7ebc52 100644
--- a/src/DotPulsar/Internal/Connection.cs
+++ b/src/DotPulsar/Internal/Connection.cs
@@ -239,6 +239,22 @@ namespace DotPulsar.Internal
             return await responseTask.ConfigureAwait(false);
         }
 
+        public async Task<BaseCommand> Send(CommandPartitionedTopicMetadata command, CancellationToken cancellationToken)
+        {
+            ThrowIfDisposed();
+
+            Task<BaseCommand>? responseTask;
+
+            using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
+            {
+                responseTask = _channelManager.Outgoing(command);
+                var sequence = Serializer.Serialize(command.AsBaseCommand());
+                await _stream.Send(sequence).ConfigureAwait(false);
+            }
+
+            return await responseTask.ConfigureAwait(false);
+        }
+
         private async Task Send(BaseCommand command, CancellationToken cancellationToken)
         {
             ThrowIfDisposed();
diff --git a/src/DotPulsar/ProducerState.cs b/src/DotPulsar/Internal/Events/PartitionedSubProducerStateChanged.cs
similarity index 52%
copy from src/DotPulsar/ProducerState.cs
copy to src/DotPulsar/Internal/Events/PartitionedSubProducerStateChanged.cs
index 5bab409..326641d 100644
--- a/src/DotPulsar/ProducerState.cs
+++ b/src/DotPulsar/Internal/Events/PartitionedSubProducerStateChanged.cs
@@ -12,31 +12,23 @@
  * limitations under the License.
  */
 
-namespace DotPulsar
+namespace DotPulsar.Internal.Events
 {
+    using Abstractions;
+    using System;
+
     /// <summary>
-    /// The possible states a producer can be in.
+    /// Representation of the sub producer of a partitioned producer state change.
     /// </summary>
-    public enum ProducerState : byte
+    public sealed class PartitionedSubProducerStateChanged : IEvent
     {
-        /// <summary>
-        /// The producer is closed. This is a final state.
-        /// </summary>
-        Closed,
-
-        /// <summary>
-        /// The producer is connected.
-        /// </summary>
-        Connected,
-
-        /// <summary>
-        /// The producer is disconnected.
-        /// </summary>
-        Disconnected,
+        public Guid CorrelationId { get; }
+        public ProducerState ProducerState { get; }
 
-        /// <summary>
-        /// The producer is faulted. This is a final state.
-        /// </summary>
-        Faulted
+        public PartitionedSubProducerStateChanged(Guid correlationId, ProducerState producerState)
+        {
+            CorrelationId = correlationId;
+            ProducerState = producerState;
+        }
     }
 }
diff --git a/src/DotPulsar/ProducerState.cs b/src/DotPulsar/Internal/Events/UpdatePartitions.cs
similarity index 52%
copy from src/DotPulsar/ProducerState.cs
copy to src/DotPulsar/Internal/Events/UpdatePartitions.cs
index 5bab409..b9894b5 100644
--- a/src/DotPulsar/ProducerState.cs
+++ b/src/DotPulsar/Internal/Events/UpdatePartitions.cs
@@ -12,31 +12,24 @@
  * limitations under the License.
  */
 
-namespace DotPulsar
+namespace DotPulsar.Internal.Events
 {
+    using Abstractions;
+    using System;
+
     /// <summary>
-    /// The possible states a producer can be in.
+    /// Representation of the partitions count of the partitioned topic updating.
     /// </summary>
-    public enum ProducerState : byte
+    public sealed class UpdatePartitions : IEvent
     {
-        /// <summary>
-        /// The producer is closed. This is a final state.
-        /// </summary>
-        Closed,
-
-        /// <summary>
-        /// The producer is connected.
-        /// </summary>
-        Connected,
+        public Guid CorrelationId { get; }
 
-        /// <summary>
-        /// The producer is disconnected.
-        /// </summary>
-        Disconnected,
+        public uint PartitionsCount { get; }
 
-        /// <summary>
-        /// The producer is faulted. This is a final state.
-        /// </summary>
-        Faulted
+        public UpdatePartitions(Guid correlationId, uint partitionsCount)
+        {
+            CorrelationId = correlationId;
+            PartitionsCount = partitionsCount;
+        }
     }
 }
diff --git a/src/DotPulsar/Internal/Exceptions/LookupNotReadyException.cs b/src/DotPulsar/Internal/Exceptions/LookupNotReadyException.cs
new file mode 100644
index 0000000..13fc562
--- /dev/null
+++ b/src/DotPulsar/Internal/Exceptions/LookupNotReadyException.cs
@@ -0,0 +1,23 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal.Exceptions
+{
+    using DotPulsar.Exceptions;
+
+    public sealed class LookupNotReadyException : DotPulsarException
+    {
+        public LookupNotReadyException() : base("The topic lookup operation is not ready yet") { }
+    }
+}
diff --git a/src/DotPulsar/Internal/Extensions/CommandExtensions.cs b/src/DotPulsar/Internal/Extensions/CommandExtensions.cs
index 0a76401..ffa9295 100644
--- a/src/DotPulsar/Internal/Extensions/CommandExtensions.cs
+++ b/src/DotPulsar/Internal/Extensions/CommandExtensions.cs
@@ -46,6 +46,9 @@ namespace DotPulsar.Internal.Extensions
         public static void Throw(this CommandGetOrCreateSchemaResponse command)
             => Throw(command.ErrorCode, command.ErrorMessage);
 
+        public static void Throw(this CommandPartitionedTopicMetadataResponse command)
+            => Throw(command.Error, command.Message);
+
         private static void Throw(ServerError error, string message)
             => throw (error switch
             {
@@ -187,5 +190,11 @@ namespace DotPulsar.Internal.Extensions
                 CommandType = BaseCommand.Type.GetOrCreateSchema,
                 GetOrCreateSchema = command
             };
+
+        public static BaseCommand AsBaseCommand(this CommandPartitionedTopicMetadata command)
+            => new BaseCommand
+            {
+                CommandType = BaseCommand.Type.PartitionedMetadata, PartitionMetadata = command
+            };
     }
 }
diff --git a/src/DotPulsar/Internal/Producer.cs b/src/DotPulsar/Internal/Producer.cs
index b130318..d3c3495 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -16,28 +16,26 @@ namespace DotPulsar.Internal
 {
     using Abstractions;
     using DotPulsar.Abstractions;
-    using DotPulsar.Exceptions;
-    using DotPulsar.Internal.Extensions;
     using Events;
-    using Microsoft.Extensions.ObjectPool;
+    using Exceptions;
     using System;
-    using System.Buffers;
+    using System.Collections.Concurrent;
     using System.Threading;
     using System.Threading.Tasks;
 
     public sealed class Producer<TMessage> : IEstablishNewChannel, IProducer<TMessage>
     {
-        private readonly ObjectPool<PulsarApi.MessageMetadata> _messageMetadataPool;
         private readonly Guid _correlationId;
         private readonly IRegisterEvent _eventRegister;
-        private IProducerChannel _channel;
         private readonly IExecute _executor;
         private readonly IStateChanged<ProducerState> _state;
-        private readonly IProducerChannelFactory _factory;
-        private readonly ISchema<TMessage> _schema;
-        private readonly SequenceId _sequenceId;
+        private readonly PulsarClient _pulsarClient;
+        private readonly ProducerOptions<TMessage> _options;
+        private readonly ConcurrentDictionary<int, IProducer<TMessage>> _producers;
+        private readonly IMessageRouter _messageRouter;
+        private readonly CancellationTokenSource _cts = new();
+        private int _producersCount;
         private int _isDisposed;
-
         public Uri ServiceUrl { get; }
         public string Topic { get; }
 
@@ -45,36 +43,50 @@ namespace DotPulsar.Internal
             Guid correlationId,
             Uri serviceUrl,
             string topic,
-            ulong initialSequenceId,
             IRegisterEvent registerEvent,
-            IProducerChannel initialChannel,
             IExecute executor,
             IStateChanged<ProducerState> state,
-            IProducerChannelFactory factory,
-            ISchema<TMessage> schema)
+            ProducerOptions<TMessage> options,
+            PulsarClient pulsarClient
+        )
         {
-            var messageMetadataPolicy = new DefaultPooledObjectPolicy<PulsarApi.MessageMetadata>();
-            _messageMetadataPool = new DefaultObjectPool<PulsarApi.MessageMetadata>(messageMetadataPolicy);
             _correlationId = correlationId;
             ServiceUrl = serviceUrl;
             Topic = topic;
-            _sequenceId = new SequenceId(initialSequenceId);
             _eventRegister = registerEvent;
-            _channel = initialChannel;
             _executor = executor;
             _state = state;
-            _factory = factory;
-            _schema = schema;
             _isDisposed = 0;
+            _options = options;
+            _pulsarClient = pulsarClient;
+            _messageRouter = options.MessageRouter;
 
-            _eventRegister.Register(new ProducerCreated(_correlationId));
+            _producers = new ConcurrentDictionary<int, IProducer<TMessage>>(1, 31);
         }
 
-        public async ValueTask<ProducerState> OnStateChangeTo(ProducerState state, CancellationToken cancellationToken)
-            => await _state.StateChangedTo(state, cancellationToken).ConfigureAwait(false);
+        private void CreateSubProducers(int startIndex, int count)
+        {
+            if (count == 0)
+            {
+                var producer = _pulsarClient.NewSubProducer(Topic, _options, _executor, _correlationId);
+                _producers[0] = producer;
+                return;
+            }
 
-        public async ValueTask<ProducerState> OnStateChangeFrom(ProducerState state, CancellationToken cancellationToken)
-            => await _state.StateChangedFrom(state, cancellationToken).ConfigureAwait(false);
+            for (var i = startIndex; i < count; ++i)
+            {
+                var producer = _pulsarClient.NewSubProducer(Topic, _options, _executor, _correlationId, (uint) i);
+                _producers[i] = producer;
+            }
+        }
+
+        private async void UpdatePartitions(CancellationToken cancellationToken)
+        {
+            var partitionsCount = (int) await _pulsarClient.GetNumberOfPartitions(Topic, cancellationToken).ConfigureAwait(false);
+            _eventRegister.Register(new UpdatePartitions(_correlationId, (uint)partitionsCount));
+            CreateSubProducers(_producers.Count, partitionsCount);
+            _producersCount = partitionsCount;
+        }
 
         public bool IsFinalState()
             => _state.IsFinalState();
@@ -82,78 +94,46 @@ namespace DotPulsar.Internal
         public bool IsFinalState(ProducerState state)
             => _state.IsFinalState(state);
 
+        public async ValueTask<ProducerState> OnStateChangeTo(ProducerState state, CancellationToken cancellationToken = default)
+            => await _state.StateChangedTo(state, cancellationToken).ConfigureAwait(false);
+
+        public async ValueTask<ProducerState> OnStateChangeFrom(ProducerState state, CancellationToken cancellationToken = default)
+            => await _state.StateChangedFrom(state, cancellationToken).ConfigureAwait(false);
+
         public async ValueTask DisposeAsync()
         {
             if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
                 return;
 
-            _eventRegister.Register(new ProducerDisposed(_correlationId));
-            await _channel.ClosedByClient(CancellationToken.None).ConfigureAwait(false);
-            await _channel.DisposeAsync().ConfigureAwait(false);
-        }
-
-        public async ValueTask<MessageId> Send(TMessage message, CancellationToken cancellationToken)
-            => await Send(_schema.Encode(message), cancellationToken).ConfigureAwait(false);
+            _cts.Cancel();
+            _cts.Dispose();
 
-        public async ValueTask<MessageId> Send(MessageMetadata metadata, TMessage message, CancellationToken cancellationToken)
-            => await Send(metadata, _schema.Encode(message), cancellationToken).ConfigureAwait(false);
-
-        public async ValueTask<MessageId> Send(ReadOnlySequence<byte> data, CancellationToken cancellationToken)
-        {
-            ThrowIfDisposed();
-
-            var metadata = _messageMetadataPool.Get();
-            try
-            {
-                metadata.SequenceId = _sequenceId.FetchNext();
-                return await _executor.Execute(() => Send(metadata, data, cancellationToken), cancellationToken).ConfigureAwait(false);
-            }
-            finally
+            foreach (var producer in _producers.Values)
             {
-                _messageMetadataPool.Return(metadata);
+                await producer.DisposeAsync().ConfigureAwait(false);
             }
-        }
-
-        public async ValueTask<MessageId> Send(MessageMetadata metadata, ReadOnlySequence<byte> data, CancellationToken cancellationToken)
-        {
-            ThrowIfDisposed();
 
-            var autoAssignSequenceId = metadata.SequenceId == 0;
-            if (autoAssignSequenceId)
-                metadata.SequenceId = _sequenceId.FetchNext();
-
-            try
-            {
-                return await _executor.Execute(() => Send(metadata.Metadata, data, cancellationToken), cancellationToken).ConfigureAwait(false);
-            }
-            finally
-            {
-                if (autoAssignSequenceId)
-                    metadata.SequenceId = 0;
-            }
-        }
-
-        private async ValueTask<MessageId> Send(PulsarApi.MessageMetadata metadata, ReadOnlySequence<byte> data, CancellationToken cancellationToken)
-        {
-            var response = await _channel.Send(metadata, data, cancellationToken).ConfigureAwait(false);
-            return response.MessageId.ToMessageId();
+            _eventRegister.Register(new ProducerDisposed(_correlationId));
         }
 
         public async Task EstablishNewChannel(CancellationToken cancellationToken)
         {
-            var channel = await _executor.Execute(() => _factory.Create(cancellationToken), cancellationToken).ConfigureAwait(false);
-
-            var oldChannel = _channel;
-            _channel = channel;
-
-            if (oldChannel is not null)
-                await oldChannel.DisposeAsync().ConfigureAwait(false);
+            await _executor.Execute(() => UpdatePartitions(cancellationToken), cancellationToken).ConfigureAwait(false);
         }
 
-        private void ThrowIfDisposed()
+        private int ChoosePartitions(MessageMetadata? metadata)
         {
-            if (_isDisposed != 0)
-                throw new ProducerDisposedException(GetType().FullName!);
+            if (_producers.IsEmpty)
+            {
+                throw new LookupNotReadyException();
+            }
+            return _producersCount == 0 ? 0 : _messageRouter.ChoosePartition(metadata, _producersCount);
         }
+
+        public async ValueTask<MessageId> Send(TMessage message, CancellationToken cancellationToken = default)
+            => await _executor.Execute(() => _producers[ChoosePartitions(null)].Send(message, cancellationToken), cancellationToken).ConfigureAwait(false);
+
+        public async ValueTask<MessageId> Send(MessageMetadata metadata, TMessage message, CancellationToken cancellationToken = default)
+            => await _executor.Execute(() => _producers[ChoosePartitions(metadata)].Send(message, cancellationToken), cancellationToken).ConfigureAwait(false);
     }
 }
diff --git a/src/DotPulsar/Internal/ProducerBuilder.cs b/src/DotPulsar/Internal/ProducerBuilder.cs
index 420b2a2..95a7bdc 100644
--- a/src/DotPulsar/Internal/ProducerBuilder.cs
+++ b/src/DotPulsar/Internal/ProducerBuilder.cs
@@ -26,6 +26,7 @@ namespace DotPulsar.Internal
         private ulong _initialSequenceId;
         private string? _topic;
         private IHandleStateChanged<ProducerStateChanged>? _stateChangedHandler;
+        private IMessageRouter? _messageRouter;
 
         public ProducerBuilder(IPulsarClient pulsarClient, ISchema<TMessage> schema)
         {
@@ -65,6 +66,12 @@ namespace DotPulsar.Internal
             return this;
         }
 
+        public IProducerBuilder<TMessage> MessageRouter(IMessageRouter messageRouter)
+        {
+            _messageRouter = messageRouter;
+            return this;
+        }
+
         public IProducer<TMessage> Create()
         {
             if (string.IsNullOrEmpty(_topic))
@@ -78,6 +85,8 @@ namespace DotPulsar.Internal
                 StateChangedHandler = _stateChangedHandler
             };
 
+            if (_messageRouter != null) options.MessageRouter = _messageRouter;
+
             return _pulsarClient.CreateProducer<TMessage>(options);
         }
     }
diff --git a/src/DotPulsar/Internal/ProducerProcess.cs b/src/DotPulsar/Internal/ProducerProcess.cs
index 7e0b37f..e7230fe 100644
--- a/src/DotPulsar/Internal/ProducerProcess.cs
+++ b/src/DotPulsar/Internal/ProducerProcess.cs
@@ -15,7 +15,9 @@
 namespace DotPulsar.Internal
 {
     using Abstractions;
+    using Events;
     using System;
+    using System.Threading;
     using System.Threading.Tasks;
 
     public sealed class ProducerProcess : Process
@@ -23,30 +25,103 @@ namespace DotPulsar.Internal
         private readonly IStateManager<ProducerState> _stateManager;
         private readonly IEstablishNewChannel _producer;
 
+        // The following variables are only used when this is the process for parent producer.
+        private readonly IRegisterEvent _processManager;
+        private int _partitionsCount;
+        private int _connectedProducersCount;
+        private int _initialProducersCount;
+
+        // The following variables are only used for sub producer
+        private readonly Guid? _partitionedProducerId;
+
         public ProducerProcess(
             Guid correlationId,
             IStateManager<ProducerState> stateManager,
-            IEstablishNewChannel producer) : base(correlationId)
+            IEstablishNewChannel producer,
+            IRegisterEvent processManager,
+            Guid? partitionedProducerId = null) : base(correlationId)
         {
             _stateManager = stateManager;
             _producer = producer;
+            _processManager = processManager;
+            _partitionedProducerId = partitionedProducerId;
         }
 
         public override async ValueTask DisposeAsync()
         {
-            _stateManager.SetState(ProducerState.Closed);
+            SetState(ProducerState.Closed);
             CancellationTokenSource.Cancel();
+
             await _producer.DisposeAsync().ConfigureAwait(false);
         }
 
+        protected override void HandleExtend(IEvent e)
+        {
+            switch (e)
+            {
+                case PartitionedSubProducerStateChanged stateChanged:
+                    switch (stateChanged.ProducerState)
+                    {
+                        case ProducerState.Closed:
+                            _stateManager.SetState(ProducerState.Closed);
+                            break;
+                        case ProducerState.Connected:
+                            Interlocked.Increment(ref _connectedProducersCount);
+                            break;
+                        case ProducerState.Disconnected:
+                            // When the sub producer is initialized, the Disconnected event will be triggered.
+                            // So we need to first subtract from _initialProducersCount.
+                            if (_initialProducersCount == 0)
+                                Interlocked.Decrement(ref _connectedProducersCount);
+                            else
+                                Interlocked.Decrement(ref _initialProducersCount);
+                            break;
+                        case ProducerState.Faulted:
+                            _stateManager.SetState(ProducerState.Faulted);
+                            break;
+                        case ProducerState.PartiallyConnected: break;
+                        default: throw new ArgumentOutOfRangeException();
+                    }
+
+                    break;
+                case UpdatePartitions updatePartitions:
+                    if (updatePartitions.PartitionsCount == 0)
+                    {
+                        Interlocked.Exchange(ref _initialProducersCount, 1);
+                        Interlocked.Exchange(ref _partitionsCount, 1);
+                    }
+                    else
+                    {
+                        Interlocked.Add(ref _initialProducersCount, (int) updatePartitions.PartitionsCount - _partitionsCount);
+                        Interlocked.Exchange(ref _partitionsCount, (int) updatePartitions.PartitionsCount);
+                    }
+
+                    break;
+            }
+        }
+
         protected override void CalculateState()
         {
             if (_stateManager.IsFinalState())
                 return;
 
+            if (!IsSubProducer()) // parent producer process
+            {
+                if (_connectedProducersCount <= 0)
+                    _stateManager.SetState(ProducerState.Disconnected);
+                else if (_connectedProducersCount == _partitionsCount)
+                    _stateManager.SetState(ProducerState.Connected);
+                else
+                    _stateManager.SetState(ProducerState.PartiallyConnected);
+
+                if (_partitionsCount == 0)
+                    _ = _producer.EstablishNewChannel(CancellationTokenSource.Token);
+                return;
+            }
+
             if (ExecutorState == ExecutorState.Faulted)
             {
-                _stateManager.SetState(ProducerState.Faulted);
+                SetState(ProducerState.Faulted);
                 return;
             }
 
@@ -54,13 +129,27 @@ namespace DotPulsar.Internal
             {
                 case ChannelState.ClosedByServer:
                 case ChannelState.Disconnected:
-                    _stateManager.SetState(ProducerState.Disconnected);
+                    SetState(ProducerState.Disconnected);
                     _ = _producer.EstablishNewChannel(CancellationTokenSource.Token);
                     return;
                 case ChannelState.Connected:
-                    _stateManager.SetState(ProducerState.Connected);
+                    SetState(ProducerState.Connected);
                     return;
             }
         }
+
+        /// <summary>
+        /// Check if this is the sub producer process of the partitioned producer.
+        /// </summary>
+        private bool IsSubProducer()
+            => _partitionedProducerId.HasValue;
+
+        private void SetState(ProducerState state)
+        {
+            _stateManager.SetState(state);
+
+            if (IsSubProducer())
+                _processManager.Register(new PartitionedSubProducerStateChanged(_partitionedProducerId!.Value, state));
+        }
     }
 }
diff --git a/src/DotPulsar/Internal/PulsarClientFactory.cs b/src/DotPulsar/Internal/PulsarClientFactory.cs
new file mode 100644
index 0000000..fc58aa9
--- /dev/null
+++ b/src/DotPulsar/Internal/PulsarClientFactory.cs
@@ -0,0 +1,28 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using System;
+
+    public sealed class PulsarClientFactory
+    {
+        public static PulsarClient CreatePulsarClient(IConnectionPool connectionPool, ProcessManager processManager, IHandleException exceptionHandler, Uri serviceUrl)
+        {
+            return new PulsarClient(connectionPool, processManager, exceptionHandler, serviceUrl);
+        }
+    }
+}
diff --git a/src/DotPulsar/Internal/RequestResponseHandler.cs b/src/DotPulsar/Internal/RequestResponseHandler.cs
index 9680d4c..011882d 100644
--- a/src/DotPulsar/Internal/RequestResponseHandler.cs
+++ b/src/DotPulsar/Internal/RequestResponseHandler.cs
@@ -39,6 +39,7 @@ namespace DotPulsar.Internal
             _getResponseIdentifier.Set(BaseCommand.Type.CloseConsumer, cmd => StandardRequest.WithConsumerId(cmd.CloseConsumer.RequestId, cmd.CloseConsumer.ConsumerId));
             _getResponseIdentifier.Set(BaseCommand.Type.CloseProducer, cmd => StandardRequest.WithProducerId(cmd.CloseProducer.RequestId, cmd.CloseProducer.ProducerId));
             _getResponseIdentifier.Set(BaseCommand.Type.LookupResponse, cmd => StandardRequest.WithRequestId(cmd.LookupTopicResponse.RequestId));
+            _getResponseIdentifier.Set(BaseCommand.Type.PartitionedMetadataResponse, cmd => StandardRequest.WithRequestId(cmd.PartitionMetadataResponse.RequestId));
             _getResponseIdentifier.Set(BaseCommand.Type.GetLastMessageIdResponse, cmd => StandardRequest.WithRequestId(cmd.GetLastMessageIdResponse.RequestId));
             _getResponseIdentifier.Set(BaseCommand.Type.GetOrCreateSchemaResponse, cmd => StandardRequest.WithRequestId(cmd.GetOrCreateSchemaResponse.RequestId));
             _getResponseIdentifier.Set(BaseCommand.Type.Success, cmd => StandardRequest.WithRequestId(cmd.Success.RequestId));
@@ -105,6 +106,12 @@ namespace DotPulsar.Internal
             return _requests.CreateTask(StandardRequest.WithRequestId(command.RequestId));
         }
 
+        public Task<BaseCommand> Outgoing(CommandPartitionedTopicMetadata command)
+        {
+            command.RequestId = _requestId.FetchNext();
+            return _requests.CreateTask(StandardRequest.WithRequestId(command.RequestId));
+        }
+
         public Task<BaseCommand> Outgoing(CommandSeek command)
         {
             command.RequestId = _requestId.FetchNext();
diff --git a/src/DotPulsar/Internal/Producer.cs b/src/DotPulsar/Internal/SubProducer.cs
similarity index 98%
copy from src/DotPulsar/Internal/Producer.cs
copy to src/DotPulsar/Internal/SubProducer.cs
index b130318..5d15b0e 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/SubProducer.cs
@@ -25,7 +25,7 @@ namespace DotPulsar.Internal
     using System.Threading;
     using System.Threading.Tasks;
 
-    public sealed class Producer<TMessage> : IEstablishNewChannel, IProducer<TMessage>
+    public sealed class SubProducer<TMessage> : IEstablishNewChannel, IProducer<TMessage>
     {
         private readonly ObjectPool<PulsarApi.MessageMetadata> _messageMetadataPool;
         private readonly Guid _correlationId;
@@ -41,7 +41,7 @@ namespace DotPulsar.Internal
         public Uri ServiceUrl { get; }
         public string Topic { get; }
 
-        public Producer(
+        public SubProducer(
             Guid correlationId,
             Uri serviceUrl,
             string topic,
@@ -91,7 +91,6 @@ namespace DotPulsar.Internal
             await _channel.ClosedByClient(CancellationToken.None).ConfigureAwait(false);
             await _channel.DisposeAsync().ConfigureAwait(false);
         }
-
         public async ValueTask<MessageId> Send(TMessage message, CancellationToken cancellationToken)
             => await Send(_schema.Encode(message), cancellationToken).ConfigureAwait(false);
 
diff --git a/src/DotPulsar/ProducerOptions.cs b/src/DotPulsar/ProducerOptions.cs
index 958a1f8..3e8cd0e 100644
--- a/src/DotPulsar/ProducerOptions.cs
+++ b/src/DotPulsar/ProducerOptions.cs
@@ -40,6 +40,7 @@ namespace DotPulsar
             InitialSequenceId = DefaultInitialSequenceId;
             Topic = topic;
             Schema = schema;
+            MessageRouter = new RoundRobinPartitionRouter();
         }
 
         /// <summary>
@@ -71,5 +72,10 @@ namespace DotPulsar
         /// Set the topic for this producer. This is required.
         /// </summary>
         public string Topic { get; set; }
+
+        /// <summary>
+        /// Set the message router. The default router is Round Robin partition router.
+        /// </summary>
+        public IMessageRouter MessageRouter { get; set; }
     }
 }
diff --git a/src/DotPulsar/ProducerState.cs b/src/DotPulsar/ProducerState.cs
index 5bab409..da216c0 100644
--- a/src/DotPulsar/ProducerState.cs
+++ b/src/DotPulsar/ProducerState.cs
@@ -37,6 +37,11 @@ namespace DotPulsar
         /// <summary>
         /// The producer is faulted. This is a final state.
         /// </summary>
-        Faulted
+        Faulted,
+
+        /// <summary>
+        /// Some of the sub-producers are disconnected.
+        /// </summary>
+        PartiallyConnected
     }
 }
diff --git a/src/DotPulsar/PulsarClient.cs b/src/DotPulsar/PulsarClient.cs
index 678ac83..5415485 100644
--- a/src/DotPulsar/PulsarClient.cs
+++ b/src/DotPulsar/PulsarClient.cs
@@ -20,6 +20,7 @@ namespace DotPulsar
     using Exceptions;
     using Internal;
     using Internal.Abstractions;
+    using Internal.Extensions;
     using System;
     using System.Linq;
     using System.Threading;
@@ -57,6 +58,20 @@ namespace DotPulsar
         public static IPulsarClientBuilder Builder()
             => new PulsarClientBuilder();
 
+        internal async Task<uint> GetNumberOfPartitions(string topic, CancellationToken cancellationToken)
+        {
+            var connection = await _connectionPool.FindConnectionForTopic(topic, cancellationToken).ConfigureAwait(false);
+            var commandPartitionedMetadata = new CommandPartitionedTopicMetadata() { Topic = topic };
+            var response = await connection.Send(commandPartitionedMetadata, cancellationToken).ConfigureAwait(false);
+
+            response.Expect(BaseCommand.Type.PartitionedMetadataResponse);
+
+            if (response.PartitionMetadataResponse.Response == CommandPartitionedTopicMetadataResponse.LookupType.Failed)
+                response.PartitionMetadataResponse.Throw();
+
+            return response.PartitionMetadataResponse.Partitions;
+        }
+
         /// <summary>
         /// Create a producer.
         /// </summary>
@@ -64,19 +79,47 @@ namespace DotPulsar
         {
             ThrowIfDisposed();
 
+            var correlationId = Guid.NewGuid();
+            var executor = new Executor(correlationId, _processManager, _exceptionHandler);
+            var stateManager = new StateManager<ProducerState>(ProducerState.Disconnected, ProducerState.Closed, ProducerState.Faulted);
+
+            var producer = new Producer<TMessage>(correlationId, ServiceUrl, options.Topic, _processManager, executor, stateManager, options, this);
+
+            if (options.StateChangedHandler is not null)
+                _ = StateMonitor.MonitorProducer(producer, options.StateChangedHandler);
+            var process = new ProducerProcess(correlationId, stateManager, producer, _processManager);
+            _processManager.Add(process);
+            process.Start();
+            return producer;
+        }
+
+        /// <summary>
+        /// Create a producer internally.
+        /// This method is used to create internal producers for partitioned producer.
+        /// </summary>
+        internal SubProducer<TMessage> NewSubProducer<TMessage>(string topic, ProducerOptions<TMessage> options, IExecute executor, Guid partitionedProducerGuid,
+            uint? partitionIndex = null)
+        {
+            ThrowIfDisposed();
+
             ICompressorFactory? compressorFactory = null;
 
+            if (partitionIndex.HasValue)
+            {
+                topic = $"{topic}-partition-{partitionIndex}";
+            }
+
             if (options.CompressionType != CompressionType.None)
             {
                 var compressionType = (Internal.PulsarApi.CompressionType) options.CompressionType;
                 compressorFactory = CompressionFactories.CompressorFactories().SingleOrDefault(f => f.CompressionType == compressionType);
+
                 if (compressorFactory is null)
                     throw new CompressionException($"Support for {compressionType} compression was not found");
             }
 
             var correlationId = Guid.NewGuid();
-            var executor = new Executor(correlationId, _processManager, _exceptionHandler);
-            var topic = options.Topic;
+
             var producerName = options.ProducerName;
             var schema = options.Schema;
             var initialSequenceId = options.InitialSequenceId;
@@ -84,10 +127,11 @@ namespace DotPulsar
             var factory = new ProducerChannelFactory(correlationId, _processManager, _connectionPool, topic, producerName, schema.SchemaInfo, compressorFactory);
             var stateManager = new StateManager<ProducerState>(ProducerState.Disconnected, ProducerState.Closed, ProducerState.Faulted);
             var initialChannel = new NotReadyChannel<TMessage>();
-            var producer = new Producer<TMessage>(correlationId, ServiceUrl, topic, initialSequenceId, _processManager, initialChannel, executor, stateManager, factory, schema);
-            if (options.StateChangedHandler is not null)
+            var producer = new SubProducer<TMessage>(correlationId, ServiceUrl, topic, initialSequenceId, _processManager, initialChannel, executor, stateManager, factory, schema);
+
+            if (options.StateChangedHandler is not null && !partitionIndex.HasValue) // the StateChangeHandler of the sub producers in partitioned producers should be disabled.
                 _ = StateMonitor.MonitorProducer(producer, options.StateChangedHandler);
-            var process = new ProducerProcess(correlationId, stateManager, producer);
+            var process = new ProducerProcess(correlationId, stateManager, producer, _processManager, partitionedProducerGuid);
             _processManager.Add(process);
             process.Start();
             return producer;
diff --git a/src/DotPulsar/RoundRobinPartitionRouter.cs b/src/DotPulsar/RoundRobinPartitionRouter.cs
new file mode 100644
index 0000000..05595d6
--- /dev/null
+++ b/src/DotPulsar/RoundRobinPartitionRouter.cs
@@ -0,0 +1,46 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar
+{
+    using Abstractions;
+    using HashDepot;
+    using System.Text;
+    using System.Threading;
+
+    /// <summary>
+    /// Round robin partition messages router
+    /// If no key is provided, the producer will publish messages across all partitions in round-robin fashion
+    /// to achieve maximum throughput. While if a key is specified on the message, the partitioned producer will
+    /// hash the key and assign message to a particular partition.
+    /// This is the default mode.
+    /// </summary>
+    public sealed class RoundRobinPartitionRouter : IMessageRouter
+    {
+        private int _partitionIndex = -1;
+
+        /// <summary>
+        /// Choose a partition in round robin routig mode
+        /// </summary>
+        public int ChoosePartition(MessageMetadata? messageMetadata, int partitionsCount)
+        {
+            if (messageMetadata != null && !string.IsNullOrEmpty(messageMetadata.Key))
+            {
+                return (int) MurmurHash3.Hash32(Encoding.UTF8.GetBytes(messageMetadata.Key ?? string.Empty), 0) % partitionsCount;
+            }
+
+            return Interlocked.Increment(ref _partitionIndex) % partitionsCount;
+        }
+    }
+}
diff --git a/src/DotPulsar/SinglePartitionRouter.cs b/src/DotPulsar/SinglePartitionRouter.cs
new file mode 100644
index 0000000..3a2b563
--- /dev/null
+++ b/src/DotPulsar/SinglePartitionRouter.cs
@@ -0,0 +1,51 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar
+{
+    using Abstractions;
+    using HashDepot;
+    using System;
+    using System.Text;
+
+    /// <summary>
+    /// If no key is provided, the producer will randomly pick one single partition and publish all the messages
+    /// into that partition. While if a key is specified on the message, the partitioned producer will hash the
+    /// key and assign message to a particular partition.
+    /// </summary>
+    public sealed class SinglePartitionRouter : IMessageRouter
+    {
+        private int? _partitionIndex;
+
+        internal SinglePartitionRouter(int? partitionIndex = null)
+        {
+            _partitionIndex = partitionIndex;
+        }
+
+        /// <summary>
+        /// Choose a partition in single partition routing mode
+        /// </summary>
+        public int ChoosePartition(MessageMetadata? messageMetadata, int partitionsCount)
+        {
+            if (messageMetadata != null && !string.IsNullOrEmpty(messageMetadata.Key))
+            {
+                return (int) MurmurHash3.Hash32(Encoding.UTF8.GetBytes(messageMetadata.Key ?? string.Empty), 0) % partitionsCount;
+            }
+
+            _partitionIndex ??= new Random().Next(0, partitionsCount);
+
+            return _partitionIndex.Value;
+        }
+    }
+}
diff --git a/tests/DotPulsar.Tests/DotPulsar.Tests.csproj b/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
index ca0bf24..f782fc3 100644
--- a/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
+++ b/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
@@ -7,6 +7,7 @@
 
   <ItemGroup>
     <PackageReference Include="FluentAssertions" Version="5.10.3" />
+    <PackageReference Include="NSubstitute" Version="4.2.2" />
     <PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.10.0" />
     <PackageReference Include="xunit" Version="2.4.1" />
     <PackageReference Include="xunit.runner.visualstudio" Version="2.4.3">
diff --git a/tests/DotPulsar.Tests/Internal/PartitionedProducerProcessTests.cs b/tests/DotPulsar.Tests/Internal/PartitionedProducerProcessTests.cs
new file mode 100644
index 0000000..255b87a
--- /dev/null
+++ b/tests/DotPulsar.Tests/Internal/PartitionedProducerProcessTests.cs
@@ -0,0 +1,111 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Tests.Internal
+{
+    using Abstractions;
+    using DotPulsar.Internal;
+    using DotPulsar.Internal.Abstractions;
+    using DotPulsar.Internal.Events;
+    using NSubstitute;
+    using System;
+    using System.Collections.Concurrent;
+    using System.Collections.Generic;
+    using Xunit;
+
+    public class PartitionedProducerProcessTests
+    {
+        [Fact]
+        public void TestPartitionedProducerStateManage_WhenSubProducersStateChange_ThenPartitionedProducerStateChangeCorrectly()
+        {
+            var connectionPool = Substitute.For<IConnectionPool>();
+            var establishNewChannel = Substitute.For<IEstablishNewChannel>();
+            var producer = Substitute.For<IProducer>();
+
+            var processManager = new ProcessManager(connectionPool);
+
+            var producerGuids = new Dictionary<uint, Guid>(3);
+            var producersGroup = new ConcurrentDictionary<uint, IProducer>(Environment.ProcessorCount, 3);
+            var partitionedProducerGuid = Guid.NewGuid();
+
+            for (uint i = 0; i < 3; i++)
+            {
+                var stateManager = new StateManager<ProducerState>(ProducerState.Disconnected, ProducerState.Closed, ProducerState.Faulted);
+                var correlationId = Guid.NewGuid();
+                var process = new ProducerProcess(correlationId, stateManager, establishNewChannel, processManager, partitionedProducerGuid);
+                producerGuids[i] = correlationId;
+                producersGroup[i] = producer;
+                processManager.Add(process);
+            }
+
+            var partitionedStateManager =
+                new StateManager<ProducerState>(ProducerState.Disconnected, ProducerState.Closed, ProducerState.Faulted);
+
+            var producerProcess = new ProducerProcess(partitionedProducerGuid, partitionedStateManager, establishNewChannel, new ProcessManager(connectionPool));
+            processManager.Add(producerProcess);
+            processManager.Register(new UpdatePartitions(partitionedProducerGuid, (uint) producersGroup.Count));
+
+            // Test initial channel
+            processManager.Register(new ChannelDisconnected(producerGuids[0]));
+            Assert.Equal(ProducerState.Disconnected, partitionedStateManager.CurrentState);
+            processManager.Register(new ChannelDisconnected(producerGuids[1]));
+            Assert.Equal(ProducerState.Disconnected, partitionedStateManager.CurrentState);
+            processManager.Register(new ChannelDisconnected(producerGuids[2]));
+            Assert.Equal(ProducerState.Disconnected, partitionedStateManager.CurrentState);
+
+            // Test connect
+            Assert.Equal(ProducerState.Disconnected, partitionedStateManager.CurrentState);
+            processManager.Register(new ChannelConnected(producerGuids[0]));
+            Assert.Equal(ProducerState.PartiallyConnected, partitionedStateManager.CurrentState);
+            processManager.Register(new ChannelConnected(producerGuids[1]));
+            Assert.Equal(ProducerState.PartiallyConnected, partitionedStateManager.CurrentState);
+            processManager.Register(new ChannelConnected(producerGuids[2]));
+            Assert.Equal(ProducerState.Connected, partitionedStateManager.CurrentState);
+
+            // Test disconnect
+            processManager.Register(new ChannelDisconnected(producerGuids[1]));
+            Assert.Equal(ProducerState.PartiallyConnected, partitionedStateManager.CurrentState);
+
+            // Test reconnect
+            processManager.Register(new ChannelConnected(producerGuids[1]));
+            Assert.Equal(ProducerState.Connected, partitionedStateManager.CurrentState);
+
+            // Test fault
+            processManager.Register(new ExecutorFaulted(producerGuids[1]));
+            Assert.Equal(ProducerState.Faulted, partitionedStateManager.CurrentState);
+        }
+
+        [Fact]
+        public void TestUpdatePartitions_WhenIncreasePartitions_ThenPartitionedProducerStateChangeCorrectly()
+        {
+            var connectionPool = Substitute.For<IConnectionPool>();
+            var processManager = new ProcessManager(connectionPool);
+            var establishNewChannel = Substitute.For<IEstablishNewChannel>();
+
+            var guid = Guid.NewGuid();
+            var stateManager = new StateManager<ProducerState>(ProducerState.Disconnected, ProducerState.Closed, ProducerState.Faulted);
+            var process = new ProducerProcess(guid, stateManager, establishNewChannel, new ProcessManager(connectionPool));
+            processManager.Add(process);
+            processManager.Register(new UpdatePartitions(guid, 1));
+
+            Assert.Equal(ProducerState.Disconnected, stateManager.CurrentState);
+            processManager.Register(new PartitionedSubProducerStateChanged(guid, ProducerState.Connected));
+            Assert.Equal(ProducerState.Connected, stateManager.CurrentState);
+            processManager.Register(new UpdatePartitions(guid, 2));
+            Assert.Equal(ProducerState.PartiallyConnected, stateManager.CurrentState);
+            processManager.Register(new PartitionedSubProducerStateChanged(guid, ProducerState.Connected));
+            Assert.Equal(ProducerState.Connected, stateManager.CurrentState);
+        }
+    }
+}
diff --git a/tests/DotPulsar.Tests/PulsarClientTests.cs b/tests/DotPulsar.Tests/PulsarClientTests.cs
new file mode 100644
index 0000000..67370d8
--- /dev/null
+++ b/tests/DotPulsar.Tests/PulsarClientTests.cs
@@ -0,0 +1,75 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Tests
+{
+    using Abstractions;
+    using DotPulsar.Internal;
+    using DotPulsar.Internal.Abstractions;
+    using DotPulsar.Internal.PulsarApi;
+    using Extensions;
+    using NSubstitute;
+    using System;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using Xunit;
+    using Schema = Schema;
+
+    public class PulsarClientTests
+    {
+        [Fact]
+        public async Task GetPartitionedProducer_GivenPartitionedTopic_ShouldReturnPartitionProducer()
+        {
+            //Arrange
+            var topicName = "persistent://public/default/test-topic";
+            uint expectedPartitions = 3;
+
+            var connection = Substitute.For<IConnection>();
+
+            // use saveGetPartitions to assert CommandPartitionedTopicMetadata.
+            CommandPartitionedTopicMetadata? saveGetPartitions = null;
+
+            connection.Send(Arg.Any<CommandPartitionedTopicMetadata>(), Arg.Any<CancellationToken>())
+                .Returns(new BaseCommand()
+                {
+                    CommandType = BaseCommand.Type.PartitionedMetadataResponse,
+                    PartitionMetadataResponse = new CommandPartitionedTopicMetadataResponse()
+                    {
+                        Response = CommandPartitionedTopicMetadataResponse.LookupType.Success, Partitions = expectedPartitions
+                    }
+                })
+                .AndDoes(info =>
+                {
+                    saveGetPartitions = (CommandPartitionedTopicMetadata) info[0];
+                });
+
+            var connectionPool = Substitute.For<IConnectionPool>();
+
+            connectionPool.FindConnectionForTopic(Arg.Any<string>(), Arg.Any<CancellationToken>())
+                .Returns(connection);
+
+            var client = PulsarClientFactory.CreatePulsarClient(connectionPool, new ProcessManager(connectionPool), Substitute.For<IHandleException>(), new Uri
+                ("pusarl://localhost:6650/"));
+
+            //Act
+            await using var producer = client.NewProducer(Schema.String).Topic(topicName).Create();
+            await ((IEstablishNewChannel) producer).EstablishNewChannel(new CancellationTokenSource(TimeSpan.FromSeconds(10)).Token);
+
+            //Assert
+            Assert.NotNull(saveGetPartitions);
+            Assert.Equal(saveGetPartitions?.Topic, topicName);
+            Assert.IsType<Producer<string>>(producer);
+        }
+    }
+}