You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/04/12 15:37:25 UTC

[GitHub] [pulsar-dotpulsar] RobertIndie opened a new pull request #71: Basic partitioned producer

RobertIndie opened a new pull request #71:
URL: https://github.com/apache/pulsar-dotpulsar/pull/71


   This PR adds the basic feature for the partitioned producer.
   
   ### Implementation
   
   #### Change of the PulsarClient
   Get the partitions count of the topic before creating the producer. The pulsar client creates a normal producer or partitioned producer by the partitions count.
   Expose an internal method `NewProducer`. This method is used to create internal producers for the partitioned producer or create a normal producer without checking partitions count.
   
   #### Change of the ProducerOptions
   Add a filed: `MessageRouter`.  The default router is the Round Robin partition router.
   
   #### Add PartitionedProducerProcess
   This class is used to maintain the state of the PartitionedProducer. Use two variables: _partitionsCount and _connectedProducersCount to determine if all sub-producers are successfully connected.
   When ProducerProcess state change occurs, ProducerProcess will send the PartitionedSubProducerStateChanged(IEvent) to the PartitionedProducerProcess. Here it updates its own state based on the state changes of sub-producers.
   
   #### Add PartitionedProducer
   When initializing the partitioned producer, create all sub-producers. The partitioned producer will route messages to sub-producers using the message router.
    This PR does not add the support for updating the partitions count of the topic. I will add this support in the next PR.
   
   ## Here are a few more things we need to discuss
   * In order to let partitioned producers use the ProducerOptions→StateChangeHandler, I changed the original solution that we discussed of separating ProducerState and PartitionedProducerState. The new solution is to add the PartiallyConnected state to the ProducerState. Is this a good solution?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] RobertIndie commented on a change in pull request #71: Basic partitioned producer

Posted by GitBox <gi...@apache.org>.
RobertIndie commented on a change in pull request #71:
URL: https://github.com/apache/pulsar-dotpulsar/pull/71#discussion_r655388443



##########
File path: src/DotPulsar/SinglePartitionRouter.cs
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar
+{
+    using Abstractions;
+    using HashDepot;
+    using System;
+    using System.Text;
+
+    /// <summary>
+    /// If no key is provided, the producer will randomly pick one single partition and publish all the messages
+    /// into that partition. While if a key is specified on the message, the partitioned producer will hash the
+    /// key and assign message to a particular partition.
+    /// </summary>
+    public sealed class SinglePartitionRouter : IMessageRouter
+    {
+        private int? _partitionIndex;
+
+        internal SinglePartitionRouter(int? partitionIndex = null)
+        {
+            _partitionIndex = partitionIndex;
+        }
+
+        /// <summary>
+        /// Choose a partition in single partition routing mode
+        /// </summary>
+        public int ChoosePartition(MessageMetadata? messageMetadata, int partitionsCount)
+        {
+            if (messageMetadata != null && !string.IsNullOrEmpty(messageMetadata.Key))
+            {
+                return (int) MurmurHash3.Hash32(Encoding.UTF8.GetBytes(messageMetadata.Key ?? string.Empty), 0) % partitionsCount;
+            }
+
+            _partitionIndex ??= new Random().Next(0, partitionsCount);

Review comment:
       This implementation is the same as the java client and it makes sense. SinglePartitionRouter simply selects a random partition when creating the producer, and all subsequent messages(without key assigned) are sent to that partition. Currently, pulsar does not support reducing partitions. The selected partition should not be changed when partition expansion occurs




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] blankensteiner commented on a change in pull request #71: Basic partitioned producer

Posted by GitBox <gi...@apache.org>.
blankensteiner commented on a change in pull request #71:
URL: https://github.com/apache/pulsar-dotpulsar/pull/71#discussion_r643908350



##########
File path: src/DotPulsar/SinglePartitionRouter.cs
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar
+{
+    using Abstractions;
+    using HashDepot;
+    using System;
+    using System.Text;
+
+    public sealed class SinglePartitionRouter : IMessageRouter
+    {
+        private int? _partitionIndex;
+
+        public SinglePartitionRouter(int? partitionIndex = null)

Review comment:
       Do we want this as public?

##########
File path: tests/DotPulsar.Tests/Internal/PartitionedProducerProcessTests.cs
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Tests.Internal
+{
+    using Abstractions;
+    using DotPulsar.Internal;
+    using DotPulsar.Internal.Abstractions;
+    using DotPulsar.Internal.Events;
+    using NSubstitute;
+    using System;
+    using System.Collections.Concurrent;
+    using System.Collections.Generic;
+    using System.Threading.Tasks;
+    using Xunit;
+
+    public class PartitionedProducerProcessTests
+    {
+        [Fact]
+        public async Task TestPartitionedProducerStateManage_WhenSubProducersStateChange_ThenPartitionedProducerStateChangeCorrectly()

Review comment:
       Let's fix the warnings from GitHub Actions

##########
File path: src/DotPulsar/PulsarClient.cs
##########
@@ -57,26 +58,84 @@ public sealed class PulsarClient : IPulsarClient
         public static IPulsarClientBuilder Builder()
             => new PulsarClientBuilder();
 
+        public async Task<uint> GetNumberOfPartitions(string topic, CancellationToken cancellationToken)
+        {
+            var connection = await _connectionPool.FindConnectionForTopic(topic, cancellationToken).ConfigureAwait(false);
+            var commandPartitionedMetadata = new CommandPartitionedTopicMetadata() { Topic = topic };
+            var response = await connection.Send(commandPartitionedMetadata, cancellationToken).ConfigureAwait(false);
+
+            response.Expect(BaseCommand.Type.PartitionedMetadataResponse);
+
+            if (response.PartitionMetadataResponse.Response == CommandPartitionedTopicMetadataResponse.LookupType.Failed)
+                response.PartitionMetadataResponse.Throw();
+
+            return response.PartitionMetadataResponse.Partitions;
+        }
+
         /// <summary>
         /// Create a producer.
         /// </summary>
         public IProducer<TMessage> CreateProducer<TMessage>(ProducerOptions<TMessage> options)
         {
             ThrowIfDisposed();
 
+            var partitionsCount = GetNumberOfPartitions(options.Topic, default).Result;

Review comment:
       In regards to this and the ProducerProcess and PartitionedProducer, I think we need to rethink it. If we want to be really resilient, we don't just want to support resizing a partitioned topic on the fly, but even support the fact that a topic can change to and from a partitioned topic. This means that the "GetNumberOfPartitions" lookup is just part of the (re)connect logic. As such, it no longer makes sense to talk about a Producer and a PartitionedProducer, since they are the same. So the current 'Producer' could be renamed to 'SubProducer' and 'PartitionedProducer' to 'Producer'. A non-partitioned producer is then just a Producer with 1 and only 1 SubProducer.
   Let me know what you think about this.

##########
File path: src/DotPulsar/Internal/Abstractions/Process.cs
##########
@@ -68,11 +68,16 @@ public void Handle(IEvent e)
                 case ChannelUnsubscribed _:
                     ChannelState = ChannelState.Unsubscribed;
                     break;
+                default:HandleExtend(e);

Review comment:
       If you run the automatic code cleanup and fix the warnings, then small styling issues like these will be solved :)

##########
File path: src/DotPulsar/RoundRobinPartitionRouter.cs
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar
+{
+    using Abstractions;
+    using HashDepot;
+    using System.Text;
+    using System.Threading;
+
+    public sealed class RoundRobinPartitionRouter : IMessageRouter

Review comment:
       Since this is public we need some documentation

##########
File path: src/DotPulsar/SinglePartitionRouter.cs
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar
+{
+    using Abstractions;
+    using HashDepot;
+    using System;
+    using System.Text;
+
+    public sealed class SinglePartitionRouter : IMessageRouter

Review comment:
       Since this is public we need some documentation

##########
File path: src/DotPulsar/Internal/PartitionedProducer.cs
##########
@@ -0,0 +1,96 @@
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using Events;
+    using System;
+    using System.Collections.Concurrent;
+    using System.Linq;
+    using System.Threading;
+    using System.Threading.Tasks;
+
+    public sealed class PartitionedProducer<TMessage> : IProducer<TMessage>
+    {
+        private readonly Guid _correlationId;
+        private readonly IRegisterEvent _eventRegister;
+        private readonly IStateChanged<ProducerState> _state;
+        private readonly PulsarClient _pulsarClient;
+        private readonly ProducerOptions<TMessage> _options;
+        private readonly ConcurrentDictionary<int, IProducer<TMessage>> _producers;
+        private readonly IMessageRouter _messageRouter;
+        private readonly CancellationTokenSource _cts = new();
+        private readonly int _producersCount;
+        private int _isDisposed;
+        public Uri ServiceUrl { get; }
+        public string Topic { get; }
+
+        public PartitionedProducer(
+            Guid correlationId,
+            Uri serviceUrl,
+            string topic,
+            IRegisterEvent registerEvent,
+            IStateChanged<ProducerState> state,
+            uint partitionsCount,
+            ProducerOptions<TMessage> options,
+            PulsarClient pulsarClient
+        )
+        {
+            _correlationId = correlationId;
+            ServiceUrl = serviceUrl;
+            Topic = topic;
+            _eventRegister = registerEvent;
+            _state = state;
+            _isDisposed = 0;
+            _options = options;
+            _pulsarClient = pulsarClient;
+            _producersCount = (int) partitionsCount;
+            _messageRouter = options.MessageRouter;
+
+            _producers = new ConcurrentDictionary<int, IProducer<TMessage>>(Environment.ProcessorCount, _producersCount);
+            CreateSubProducers(0, _producersCount);
+        }
+
+        private void CreateSubProducers(int startIndex, int count)
+        {
+            for (int i = 0; i < count; i++)
+            {
+                var producer = _pulsarClient.NewProducer(Topic, _options, (uint)(i+startIndex), _correlationId);
+                _producers[i+startIndex] = producer;
+            }
+        }
+
+        public bool IsFinalState()
+            => _state.IsFinalState();
+
+        public bool IsFinalState(ProducerState state)
+            => _state.IsFinalState(state);
+
+        public async ValueTask<ProducerState> OnStateChangeTo(ProducerState state, CancellationToken cancellationToken = default)
+            => await _state.StateChangedTo(state, cancellationToken).ConfigureAwait(false);
+
+        public async ValueTask<ProducerState> OnStateChangeFrom(ProducerState state, CancellationToken cancellationToken = default)
+            => await _state.StateChangedFrom(state, cancellationToken).ConfigureAwait(false);
+
+        public async ValueTask DisposeAsync()
+        {
+            if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
+                return;
+
+            _cts.Cancel();
+            _cts.Dispose();
+
+            foreach (var producer in _producers.Values)
+            {
+                await producer.DisposeAsync().ConfigureAwait(false);
+            }
+
+            _eventRegister.Register(new ProducerDisposed(_correlationId));
+        }
+
+        public async ValueTask<MessageId> Send(TMessage message, CancellationToken cancellationToken = default)
+            => await _producers[_messageRouter.ChoosePartition(null, _producersCount)].Send(message, cancellationToken);

Review comment:
       Missing ConfigureAwait(false);

##########
File path: src/DotPulsar/Internal/PartitionedProducer.cs
##########
@@ -0,0 +1,96 @@
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using Events;
+    using System;
+    using System.Collections.Concurrent;
+    using System.Linq;
+    using System.Threading;
+    using System.Threading.Tasks;
+
+    public sealed class PartitionedProducer<TMessage> : IProducer<TMessage>
+    {
+        private readonly Guid _correlationId;
+        private readonly IRegisterEvent _eventRegister;
+        private readonly IStateChanged<ProducerState> _state;
+        private readonly PulsarClient _pulsarClient;
+        private readonly ProducerOptions<TMessage> _options;
+        private readonly ConcurrentDictionary<int, IProducer<TMessage>> _producers;
+        private readonly IMessageRouter _messageRouter;
+        private readonly CancellationTokenSource _cts = new();
+        private readonly int _producersCount;
+        private int _isDisposed;
+        public Uri ServiceUrl { get; }
+        public string Topic { get; }
+
+        public PartitionedProducer(
+            Guid correlationId,
+            Uri serviceUrl,
+            string topic,
+            IRegisterEvent registerEvent,
+            IStateChanged<ProducerState> state,
+            uint partitionsCount,
+            ProducerOptions<TMessage> options,
+            PulsarClient pulsarClient
+        )
+        {
+            _correlationId = correlationId;
+            ServiceUrl = serviceUrl;
+            Topic = topic;
+            _eventRegister = registerEvent;
+            _state = state;
+            _isDisposed = 0;
+            _options = options;
+            _pulsarClient = pulsarClient;
+            _producersCount = (int) partitionsCount;
+            _messageRouter = options.MessageRouter;
+
+            _producers = new ConcurrentDictionary<int, IProducer<TMessage>>(Environment.ProcessorCount, _producersCount);
+            CreateSubProducers(0, _producersCount);
+        }
+
+        private void CreateSubProducers(int startIndex, int count)
+        {
+            for (int i = 0; i < count; i++)
+            {
+                var producer = _pulsarClient.NewProducer(Topic, _options, (uint)(i+startIndex), _correlationId);
+                _producers[i+startIndex] = producer;
+            }
+        }
+
+        public bool IsFinalState()
+            => _state.IsFinalState();
+
+        public bool IsFinalState(ProducerState state)
+            => _state.IsFinalState(state);
+
+        public async ValueTask<ProducerState> OnStateChangeTo(ProducerState state, CancellationToken cancellationToken = default)
+            => await _state.StateChangedTo(state, cancellationToken).ConfigureAwait(false);
+
+        public async ValueTask<ProducerState> OnStateChangeFrom(ProducerState state, CancellationToken cancellationToken = default)
+            => await _state.StateChangedFrom(state, cancellationToken).ConfigureAwait(false);
+
+        public async ValueTask DisposeAsync()
+        {
+            if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
+                return;
+
+            _cts.Cancel();
+            _cts.Dispose();
+
+            foreach (var producer in _producers.Values)
+            {
+                await producer.DisposeAsync().ConfigureAwait(false);
+            }
+
+            _eventRegister.Register(new ProducerDisposed(_correlationId));
+        }
+
+        public async ValueTask<MessageId> Send(TMessage message, CancellationToken cancellationToken = default)
+            => await _producers[_messageRouter.ChoosePartition(null, _producersCount)].Send(message, cancellationToken);
+
+        public async ValueTask<MessageId> Send(MessageMetadata metadata, TMessage message, CancellationToken cancellationToken = default)
+            => await _producers[_messageRouter.ChoosePartition(metadata, _producersCount)].Send(message, cancellationToken);

Review comment:
       Missing ConfigureAwait(false);

##########
File path: src/DotPulsar/Internal/PartitionedProducer.cs
##########
@@ -0,0 +1,96 @@
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using Events;
+    using System;
+    using System.Collections.Concurrent;
+    using System.Linq;
+    using System.Threading;
+    using System.Threading.Tasks;
+
+    public sealed class PartitionedProducer<TMessage> : IProducer<TMessage>
+    {
+        private readonly Guid _correlationId;
+        private readonly IRegisterEvent _eventRegister;
+        private readonly IStateChanged<ProducerState> _state;
+        private readonly PulsarClient _pulsarClient;
+        private readonly ProducerOptions<TMessage> _options;
+        private readonly ConcurrentDictionary<int, IProducer<TMessage>> _producers;
+        private readonly IMessageRouter _messageRouter;
+        private readonly CancellationTokenSource _cts = new();
+        private readonly int _producersCount;
+        private int _isDisposed;
+        public Uri ServiceUrl { get; }
+        public string Topic { get; }
+
+        public PartitionedProducer(
+            Guid correlationId,
+            Uri serviceUrl,
+            string topic,
+            IRegisterEvent registerEvent,
+            IStateChanged<ProducerState> state,
+            uint partitionsCount,
+            ProducerOptions<TMessage> options,
+            PulsarClient pulsarClient
+        )
+        {
+            _correlationId = correlationId;
+            ServiceUrl = serviceUrl;
+            Topic = topic;
+            _eventRegister = registerEvent;
+            _state = state;
+            _isDisposed = 0;
+            _options = options;
+            _pulsarClient = pulsarClient;
+            _producersCount = (int) partitionsCount;
+            _messageRouter = options.MessageRouter;
+
+            _producers = new ConcurrentDictionary<int, IProducer<TMessage>>(Environment.ProcessorCount, _producersCount);

Review comment:
       Multiple tasks will be reading the dictionary, but only one will update it. We can therefore use another constructor since 'concurrencyLevel' is only 1 and not {Environment.ProcessorCount}.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] RobertIndie commented on a change in pull request #71: Basic partitioned producer

Posted by GitBox <gi...@apache.org>.
RobertIndie commented on a change in pull request #71:
URL: https://github.com/apache/pulsar-dotpulsar/pull/71#discussion_r655126010



##########
File path: src/DotPulsar/Internal/Producer.cs
##########
@@ -1,159 +1,122 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace DotPulsar.Internal
+namespace DotPulsar.Internal
 {
     using Abstractions;
     using DotPulsar.Abstractions;
-    using DotPulsar.Exceptions;
-    using DotPulsar.Internal.Extensions;
     using Events;
-    using Microsoft.Extensions.ObjectPool;
+    using Exceptions;
     using System;
-    using System.Buffers;
+    using System.Collections.Concurrent;
     using System.Threading;
     using System.Threading.Tasks;
 
-    public sealed class Producer<TMessage> : IEstablishNewChannel, IProducer<TMessage>
+    public sealed class Producer<TMessage> : IProducer<TMessage>
     {
-        private readonly ObjectPool<PulsarApi.MessageMetadata> _messageMetadataPool;
         private readonly Guid _correlationId;
         private readonly IRegisterEvent _eventRegister;
-        private IProducerChannel _channel;
         private readonly IExecute _executor;
         private readonly IStateChanged<ProducerState> _state;
-        private readonly IProducerChannelFactory _factory;
-        private readonly ISchema<TMessage> _schema;
-        private readonly SequenceId _sequenceId;
+        private readonly PulsarClient _pulsarClient;
+        private readonly ProducerOptions<TMessage> _options;
+        private readonly ConcurrentDictionary<int, IProducer<TMessage>> _producers;
+        private readonly IMessageRouter _messageRouter;
+        private readonly CancellationTokenSource _cts = new();
+        private int _producersCount;
         private int _isDisposed;
-
         public Uri ServiceUrl { get; }
         public string Topic { get; }
 
         public Producer(
             Guid correlationId,
             Uri serviceUrl,
             string topic,
-            ulong initialSequenceId,
             IRegisterEvent registerEvent,
-            IProducerChannel initialChannel,
             IExecute executor,
             IStateChanged<ProducerState> state,
-            IProducerChannelFactory factory,
-            ISchema<TMessage> schema)
+            ProducerOptions<TMessage> options,
+            PulsarClient pulsarClient
+        )
         {
-            var messageMetadataPolicy = new DefaultPooledObjectPolicy<PulsarApi.MessageMetadata>();
-            _messageMetadataPool = new DefaultObjectPool<PulsarApi.MessageMetadata>(messageMetadataPolicy);
             _correlationId = correlationId;
             ServiceUrl = serviceUrl;
             Topic = topic;
-            _sequenceId = new SequenceId(initialSequenceId);
             _eventRegister = registerEvent;
-            _channel = initialChannel;
             _executor = executor;
             _state = state;
-            _factory = factory;
-            _schema = schema;
             _isDisposed = 0;
+            _options = options;
+            _pulsarClient = pulsarClient;
+            _messageRouter = options.MessageRouter;
+
+            _producers = new ConcurrentDictionary<int, IProducer<TMessage>>(1, 31);
 
-            _eventRegister.Register(new ProducerCreated(_correlationId));
+            UpdatePartitions(_cts.Token);
         }
 
-        public async ValueTask<ProducerState> OnStateChangeTo(ProducerState state, CancellationToken cancellationToken)
-            => await _state.StateChangedTo(state, cancellationToken).ConfigureAwait(false);
+        private void CreateSubProducers(int startIndex, int count)
+        {
+            if (count == 0)
+            {
+                var producer = _pulsarClient.NewSubProducer(Topic, _options, _executor, _correlationId);
+                _producers[0] = producer;
+                return;
+            }
 
-        public async ValueTask<ProducerState> OnStateChangeFrom(ProducerState state, CancellationToken cancellationToken)
-            => await _state.StateChangedFrom(state, cancellationToken).ConfigureAwait(false);
+            for (var i = startIndex; i < count; ++i)
+            {
+                var producer = _pulsarClient.NewSubProducer(Topic, _options, _executor, _correlationId, (uint) i);
+                _producers[i] = producer;
+            }
+        }
+
+        private async void UpdatePartitions(CancellationToken cancellationToken)

Review comment:
       I have moved it to `EstablishNewChannel `. However, there are two types of logic in `EstablishNewChannel` currently: `Create new channel` and `Update partitions`. Can we rename `EstablishNewChannel` to `InitialConnection`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] blankensteiner commented on a change in pull request #71: Basic partitioned producer

Posted by GitBox <gi...@apache.org>.
blankensteiner commented on a change in pull request #71:
URL: https://github.com/apache/pulsar-dotpulsar/pull/71#discussion_r613025495



##########
File path: src/DotPulsar/Internal/Events/PartitionedSubProducerStateChanged.cs
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal.Events
+{
+    using Abstractions;
+    using System;
+
+    /// <summary>
+    /// Representation of the sub producer of a partitioned producer state change.
+    /// </summary>
+    public class PartitionedSubProducerStateChanged : IEvent

Review comment:
       Let's mark it as sealed

##########
File path: src/DotPulsar/Internal/PartitionedProducer.cs
##########
@@ -0,0 +1,99 @@
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using Events;
+    using System;
+    using System.Collections.Concurrent;
+    using System.Linq;
+    using System.Threading;
+    using System.Threading.Tasks;
+
+    public class PartitionedProducer<TMessage> : IProducer<TMessage>
+    {
+        private readonly Guid _correlationId;
+        private readonly IRegisterEvent _eventRegister;
+        private readonly IStateChanged<ProducerState> _state;
+        private readonly PulsarClient _pulsarClient;
+        private readonly ProducerOptions<TMessage> _options;
+        private readonly ConcurrentDictionary<int, IProducer<TMessage>> _producers;
+        private readonly IMessageRouter _messageRouter;
+        private readonly CancellationTokenSource _cts = new();
+        private int _isDisposed;
+        private int _producersCount;
+        public Uri ServiceUrl { get; }
+        public string Topic { get; }
+
+        public PartitionedProducer(
+            Guid correlationId,
+            Uri serviceUrl,
+            string topic,
+            IRegisterEvent registerEvent,
+            IStateChanged<ProducerState> state,
+            uint partitionsCount,
+            ProducerOptions<TMessage> options,
+            PulsarClient pulsarClient
+        )
+        {
+            _correlationId = correlationId;
+            ServiceUrl = serviceUrl;
+            Topic = topic;
+            _eventRegister = registerEvent;
+            _state = state;
+            _isDisposed = 0;
+            _options = options;
+            _pulsarClient = pulsarClient;
+            _producersCount = (int) partitionsCount;
+            _messageRouter = options.MessageRouter;
+
+            _producers = new ConcurrentDictionary<int, IProducer<TMessage>>(Environment.ProcessorCount, _producersCount);
+            CreateSubProducers(0, _producersCount).Wait();

Review comment:
       .Wait() and .Result on a task are blocking and as such a no-go.
   We need to rethink how to create a partitioned producer without "Create" needing to be an async operation.

##########
File path: src/DotPulsar/Internal/PartitionedProducer.cs
##########
@@ -0,0 +1,99 @@
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using Events;
+    using System;
+    using System.Collections.Concurrent;
+    using System.Linq;
+    using System.Threading;
+    using System.Threading.Tasks;
+
+    public class PartitionedProducer<TMessage> : IProducer<TMessage>
+    {
+        private readonly Guid _correlationId;
+        private readonly IRegisterEvent _eventRegister;
+        private readonly IStateChanged<ProducerState> _state;
+        private readonly PulsarClient _pulsarClient;
+        private readonly ProducerOptions<TMessage> _options;
+        private readonly ConcurrentDictionary<int, IProducer<TMessage>> _producers;
+        private readonly IMessageRouter _messageRouter;
+        private readonly CancellationTokenSource _cts = new();
+        private int _isDisposed;
+        private int _producersCount;
+        public Uri ServiceUrl { get; }
+        public string Topic { get; }
+
+        public PartitionedProducer(
+            Guid correlationId,
+            Uri serviceUrl,
+            string topic,
+            IRegisterEvent registerEvent,
+            IStateChanged<ProducerState> state,
+            uint partitionsCount,
+            ProducerOptions<TMessage> options,
+            PulsarClient pulsarClient
+        )
+        {
+            _correlationId = correlationId;
+            ServiceUrl = serviceUrl;
+            Topic = topic;
+            _eventRegister = registerEvent;
+            _state = state;
+            _isDisposed = 0;
+            _options = options;
+            _pulsarClient = pulsarClient;
+            _producersCount = (int) partitionsCount;
+            _messageRouter = options.MessageRouter;
+
+            _producers = new ConcurrentDictionary<int, IProducer<TMessage>>(Environment.ProcessorCount, _producersCount);
+            CreateSubProducers(0, _producersCount).Wait();
+        }
+
+        private async Task CreateSubProducers(int startIndex, int count)
+        {
+            await Task.WhenAll(Enumerable.Range(startIndex, count).Select(n =>

Review comment:
       The producers can just be created, no need to create a task for creating them

##########
File path: src/DotPulsar/Internal/PartitionedProducer.cs
##########
@@ -0,0 +1,99 @@
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using Events;
+    using System;
+    using System.Collections.Concurrent;
+    using System.Linq;
+    using System.Threading;
+    using System.Threading.Tasks;
+
+    public class PartitionedProducer<TMessage> : IProducer<TMessage>
+    {
+        private readonly Guid _correlationId;
+        private readonly IRegisterEvent _eventRegister;
+        private readonly IStateChanged<ProducerState> _state;
+        private readonly PulsarClient _pulsarClient;
+        private readonly ProducerOptions<TMessage> _options;
+        private readonly ConcurrentDictionary<int, IProducer<TMessage>> _producers;
+        private readonly IMessageRouter _messageRouter;
+        private readonly CancellationTokenSource _cts = new();
+        private int _isDisposed;
+        private int _producersCount;
+        public Uri ServiceUrl { get; }
+        public string Topic { get; }
+
+        public PartitionedProducer(
+            Guid correlationId,
+            Uri serviceUrl,
+            string topic,
+            IRegisterEvent registerEvent,
+            IStateChanged<ProducerState> state,
+            uint partitionsCount,
+            ProducerOptions<TMessage> options,
+            PulsarClient pulsarClient
+        )
+        {
+            _correlationId = correlationId;
+            ServiceUrl = serviceUrl;
+            Topic = topic;
+            _eventRegister = registerEvent;
+            _state = state;
+            _isDisposed = 0;
+            _options = options;
+            _pulsarClient = pulsarClient;
+            _producersCount = (int) partitionsCount;
+            _messageRouter = options.MessageRouter;
+
+            _producers = new ConcurrentDictionary<int, IProducer<TMessage>>(Environment.ProcessorCount, _producersCount);
+            CreateSubProducers(0, _producersCount).Wait();
+        }
+
+        private async Task CreateSubProducers(int startIndex, int count)
+        {
+            await Task.WhenAll(Enumerable.Range(startIndex, count).Select(n =>
+            {
+                return Task.Run( () =>
+                {
+                    var producer = _pulsarClient.NewProducer(Topic, _options, (uint)n, _correlationId);
+                    _producers[n] = producer;
+                }, _cts.Token);
+            }).ToList()).ConfigureAwait(false);
+        }
+
+        public bool IsFinalState()
+            => _state.IsFinalState();
+
+        public bool IsFinalState(ProducerState state)
+            => _state.IsFinalState(state);
+
+        public async ValueTask<ProducerState> OnStateChangeTo(ProducerState state, CancellationToken cancellationToken = default)
+            => await _state.StateChangedTo(state, cancellationToken).ConfigureAwait(false);
+
+        public async ValueTask<ProducerState> OnStateChangeFrom(ProducerState state, CancellationToken cancellationToken = default)
+            => await _state.StateChangedFrom(state, cancellationToken).ConfigureAwait(false);
+
+        public async ValueTask DisposeAsync()
+        {
+            if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
+                return;
+
+            _cts.Cancel();
+            _cts.Dispose();
+
+            foreach (var producer in _producers.Values)
+            {
+                await producer.DisposeAsync().ConfigureAwait(false);
+            }
+
+            _eventRegister.Register(new ProducerDisposed(_correlationId));
+        }
+
+        public ValueTask<MessageId> Send(TMessage message, CancellationToken cancellationToken = default)
+            => _producers[_messageRouter.ChoosePartition(null, _producersCount)].Send(message, cancellationToken);

Review comment:
       Doing this is totally valid, but in the name of consistency let's await it

##########
File path: src/DotPulsar/Internal/Events/UpdatePartitions.cs
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal.Events
+{
+    using Abstractions;
+    using System;
+
+    /// <summary>
+    /// Representation of the partitions count of the partitioned topic updating.
+    /// </summary>
+    public class UpdatePartitions : IEvent

Review comment:
       Let's mark it as sealed

##########
File path: src/DotPulsar/Internal/PartitionedProducer.cs
##########
@@ -0,0 +1,99 @@
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using Events;
+    using System;
+    using System.Collections.Concurrent;
+    using System.Linq;
+    using System.Threading;
+    using System.Threading.Tasks;
+
+    public class PartitionedProducer<TMessage> : IProducer<TMessage>
+    {
+        private readonly Guid _correlationId;
+        private readonly IRegisterEvent _eventRegister;
+        private readonly IStateChanged<ProducerState> _state;
+        private readonly PulsarClient _pulsarClient;
+        private readonly ProducerOptions<TMessage> _options;
+        private readonly ConcurrentDictionary<int, IProducer<TMessage>> _producers;
+        private readonly IMessageRouter _messageRouter;
+        private readonly CancellationTokenSource _cts = new();
+        private int _isDisposed;
+        private int _producersCount;
+        public Uri ServiceUrl { get; }
+        public string Topic { get; }
+
+        public PartitionedProducer(
+            Guid correlationId,
+            Uri serviceUrl,
+            string topic,
+            IRegisterEvent registerEvent,
+            IStateChanged<ProducerState> state,
+            uint partitionsCount,
+            ProducerOptions<TMessage> options,
+            PulsarClient pulsarClient
+        )
+        {
+            _correlationId = correlationId;
+            ServiceUrl = serviceUrl;
+            Topic = topic;
+            _eventRegister = registerEvent;
+            _state = state;
+            _isDisposed = 0;
+            _options = options;
+            _pulsarClient = pulsarClient;
+            _producersCount = (int) partitionsCount;
+            _messageRouter = options.MessageRouter;
+
+            _producers = new ConcurrentDictionary<int, IProducer<TMessage>>(Environment.ProcessorCount, _producersCount);
+            CreateSubProducers(0, _producersCount).Wait();
+        }
+
+        private async Task CreateSubProducers(int startIndex, int count)
+        {
+            await Task.WhenAll(Enumerable.Range(startIndex, count).Select(n =>
+            {
+                return Task.Run( () =>
+                {
+                    var producer = _pulsarClient.NewProducer(Topic, _options, (uint)n, _correlationId);
+                    _producers[n] = producer;
+                }, _cts.Token);
+            }).ToList()).ConfigureAwait(false);
+        }
+
+        public bool IsFinalState()
+            => _state.IsFinalState();
+
+        public bool IsFinalState(ProducerState state)
+            => _state.IsFinalState(state);
+
+        public async ValueTask<ProducerState> OnStateChangeTo(ProducerState state, CancellationToken cancellationToken = default)
+            => await _state.StateChangedTo(state, cancellationToken).ConfigureAwait(false);
+
+        public async ValueTask<ProducerState> OnStateChangeFrom(ProducerState state, CancellationToken cancellationToken = default)
+            => await _state.StateChangedFrom(state, cancellationToken).ConfigureAwait(false);
+
+        public async ValueTask DisposeAsync()
+        {
+            if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
+                return;
+
+            _cts.Cancel();
+            _cts.Dispose();
+
+            foreach (var producer in _producers.Values)
+            {
+                await producer.DisposeAsync().ConfigureAwait(false);
+            }
+
+            _eventRegister.Register(new ProducerDisposed(_correlationId));
+        }
+
+        public ValueTask<MessageId> Send(TMessage message, CancellationToken cancellationToken = default)
+            => _producers[_messageRouter.ChoosePartition(null, _producersCount)].Send(message, cancellationToken);
+
+        public ValueTask<MessageId> Send(MessageMetadata metadata, TMessage message, CancellationToken cancellationToken = default)
+            => _producers[_messageRouter.ChoosePartition(metadata, _producersCount)].Send(message, cancellationToken);

Review comment:
       Doing this is totally valid, but in the name of consistency let's await it

##########
File path: src/DotPulsar/Internal/PartitionedProducer.cs
##########
@@ -0,0 +1,99 @@
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using Events;
+    using System;
+    using System.Collections.Concurrent;
+    using System.Linq;
+    using System.Threading;
+    using System.Threading.Tasks;
+
+    public class PartitionedProducer<TMessage> : IProducer<TMessage>

Review comment:
       Let's mark it as sealed

##########
File path: src/DotPulsar/PulsarClient.cs
##########
@@ -37,7 +38,7 @@ public sealed class PulsarClient : IPulsarClient
 
         public Uri ServiceUrl { get; }
 
-        internal PulsarClient(
+        public PulsarClient(

Review comment:
       We can't make this public. If you need to create an instance for testing, then (in the internal namespace) you can create a PulsarClientFactory

##########
File path: src/DotPulsar/PulsarClient.cs
##########
@@ -57,26 +58,84 @@ public sealed class PulsarClient : IPulsarClient
         public static IPulsarClientBuilder Builder()
             => new PulsarClientBuilder();
 
+        public async Task<uint> GetNumberOfPartitions(string topic, CancellationToken cancellationToken)
+        {
+            var connection = await _connectionPool.FindConnectionForTopic(topic, cancellationToken).ConfigureAwait(false);
+            var commandPartitionedMetadata = new CommandPartitionedTopicMetadata() { Topic = topic };
+            var response = await connection.Send(commandPartitionedMetadata, cancellationToken).ConfigureAwait(false);
+
+            response.Expect(BaseCommand.Type.PartitionedMetadataResponse);
+
+            if (response.PartitionMetadataResponse.Response == CommandPartitionedTopicMetadataResponse.LookupType.Failed)
+                response.PartitionMetadataResponse.Throw();
+
+            return response.PartitionMetadataResponse.Partitions;
+        }
+
         /// <summary>
         /// Create a producer.
         /// </summary>
         public IProducer<TMessage> CreateProducer<TMessage>(ProducerOptions<TMessage> options)
         {
             ThrowIfDisposed();
 
+            var partitionsCount = GetNumberOfPartitions(options.Topic, default).Result;

Review comment:
       .Result is a no-go, so we need to rethink how the partitioned producer is created. Let's have a talk about that on slack maybe?

##########
File path: tests/DotPulsar.Tests/DotPulsar.Tests.csproj
##########
@@ -8,6 +8,7 @@
   <ItemGroup>
     <PackageReference Include="FluentAssertions" Version="5.10.3" />
     <PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.9.1" />
+    <PackageReference Include="Moq" Version="4.16.1" />

Review comment:
       Like we talk about, we'll be using NSubstitute instead

##########
File path: src/DotPulsar/PulsarClient.cs
##########
@@ -57,26 +58,84 @@ public sealed class PulsarClient : IPulsarClient
         public static IPulsarClientBuilder Builder()
             => new PulsarClientBuilder();
 
+        public async Task<uint> GetNumberOfPartitions(string topic, CancellationToken cancellationToken)

Review comment:
       Intentional that it is public?

##########
File path: src/DotPulsar/RoundRobinPartitionRouter.cs
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar
+{
+    using Abstractions;
+    using HashDepot;
+    using System.Text;
+    using System.Threading;
+
+    public class RoundRobinPartitionRouter : IMessageRouter

Review comment:
       Let's make this sealed

##########
File path: tests/DotPulsar.Tests/Internal/PartitionedProducerProcessTests.cs
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Tests.Internal
+{
+    using Abstractions;
+    using DotPulsar.Internal;
+    using DotPulsar.Internal.Abstractions;
+    using DotPulsar.Internal.Events;
+    using Moq;
+    using System;
+    using System.Collections.Concurrent;
+    using System.Collections.Generic;
+    using Xunit;
+
+    public class PartitionedProducerProcessTests
+    {
+        [Fact]
+        public void TestPartitionedProducerStateManage()
+        {
+            var connectionPoolMock = new Mock<IConnectionPool>(MockBehavior.Loose);
+            var connectionPool = connectionPoolMock.Object;
+            var establishNewChannelMock = new Mock<IEstablishNewChannel>(MockBehavior.Loose);
+            var establishNewChannel = establishNewChannelMock.Object;
+            var producerMock = new Mock<IProducer>(MockBehavior.Loose);
+            var producer = producerMock.Object;
+
+            var processManager = new ProcessManager(connectionPool);
+
+            var producerProcesses = new Dictionary<uint, ProducerProcess>(3);
+            var producerGuids = new Dictionary<uint, Guid>(3);
+            var producersGroup = new ConcurrentDictionary<uint, IProducer>(Environment.ProcessorCount, 3);
+            var partitionedProducerGuid = Guid.NewGuid();
+
+            for (uint i = 0; i < 3; i++)
+            {
+                var stateManager = new StateManager<ProducerState>(ProducerState.Disconnected, ProducerState.Closed, ProducerState.Faulted);
+                var correlationId = Guid.NewGuid();
+                var process = new ProducerProcess(correlationId, stateManager, establishNewChannel, processManager, partitionedProducerGuid, i);
+                producerGuids[i] = correlationId;
+                producerProcesses[i] = process;
+                producersGroup[i] = producer;
+                processManager.Add(process);
+            }
+
+            var partitionedStateManager =
+                new StateManager<ProducerState>(ProducerState.Disconnected, ProducerState.Closed, ProducerState.Faulted);
+            var partitionedProducerProcess = new PartitionedProducerProcess(partitionedProducerGuid, partitionedStateManager, producersGroup.Count);
+            processManager.Add(partitionedProducerProcess);
+
+            // Test connect
+            Assert.Equal(ProducerState.Disconnected, partitionedStateManager.CurrentState);
+            processManager.Register(new ChannelConnected(producerGuids[0]));
+            Assert.Equal(ProducerState.PartiallyConnected, partitionedStateManager.CurrentState);
+            processManager.Register(new ChannelConnected(producerGuids[1]));
+            Assert.Equal(ProducerState.PartiallyConnected, partitionedStateManager.CurrentState);
+            processManager.Register(new ChannelConnected(producerGuids[2]));
+            Assert.Equal(ProducerState.Connected, partitionedStateManager.CurrentState);
+
+            // Test disconnect
+            processManager.Register(new ChannelDisconnected(producerGuids[1]));
+            Assert.Equal(ProducerState.PartiallyConnected, partitionedStateManager.CurrentState);
+
+            // Test reconnect
+            processManager.Register(new ChannelConnected(producerGuids[1]));
+            Assert.Equal(ProducerState.Connected, partitionedStateManager.CurrentState);
+
+            // Test fault
+            processManager.Register(new ExecutorFaulted(producerGuids[1]));
+            Assert.Equal(ProducerState.Faulted, partitionedStateManager.CurrentState);
+        }
+
+        [Fact]
+        public void TestUpdatePartitions()

Review comment:
       The test naming convention is [Method/Feature]_[Given/When]Something_[Should/Then]ExpectedResult

##########
File path: tests/DotPulsar.Tests/PulsarClientTests.cs
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Tests
+{
+    using Abstractions;
+    using DotPulsar.Internal;
+    using DotPulsar.Internal.Abstractions;
+    using DotPulsar.Internal.PulsarApi;
+    using Moq;
+    using System;
+    using System.Threading;
+    using Xunit;
+
+    public class PulsarClientTests
+    {
+        [Fact]
+        public async void GetPartitions_GivenPartitionedTopic_ShouldReturnPartitionsNumber()

Review comment:
       Use "Task" instead of "void"

##########
File path: src/DotPulsar/Internal/PartitionedProducerProcess.cs
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using Events;
+    using System;
+    using System.Threading;
+    using System.Threading.Tasks;
+
+    public class PartitionedProducerProcess : IProcess

Review comment:
       As we talked about, this can just be deleted as you decided to make changes to ProducerProcess instead.

##########
File path: tests/DotPulsar.Tests/Internal/PartitionedProducerProcessTests.cs
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Tests.Internal
+{
+    using Abstractions;
+    using DotPulsar.Internal;
+    using DotPulsar.Internal.Abstractions;
+    using DotPulsar.Internal.Events;
+    using Moq;
+    using System;
+    using System.Collections.Concurrent;
+    using System.Collections.Generic;
+    using Xunit;
+
+    public class PartitionedProducerProcessTests
+    {
+        [Fact]
+        public void TestPartitionedProducerStateManage()

Review comment:
       The test naming convention is [Method/Feature]_[Given/When]Something_[Should/Then]ExpectedResult




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] RagingKore commented on a change in pull request #71: Basic partitioned producer

Posted by GitBox <gi...@apache.org>.
RagingKore commented on a change in pull request #71:
URL: https://github.com/apache/pulsar-dotpulsar/pull/71#discussion_r637382425



##########
File path: src/DotPulsar/PulsarClient.cs
##########
@@ -57,26 +58,84 @@ public sealed class PulsarClient : IPulsarClient
         public static IPulsarClientBuilder Builder()
             => new PulsarClientBuilder();
 
+        public async Task<uint> GetNumberOfPartitions(string topic, CancellationToken cancellationToken)
+        {
+            var connection = await _connectionPool.FindConnectionForTopic(topic, cancellationToken).ConfigureAwait(false);
+            var commandPartitionedMetadata = new CommandPartitionedTopicMetadata() { Topic = topic };
+            var response = await connection.Send(commandPartitionedMetadata, cancellationToken).ConfigureAwait(false);
+
+            response.Expect(BaseCommand.Type.PartitionedMetadataResponse);
+
+            if (response.PartitionMetadataResponse.Response == CommandPartitionedTopicMetadataResponse.LookupType.Failed)
+                response.PartitionMetadataResponse.Throw();
+
+            return response.PartitionMetadataResponse.Partitions;
+        }
+
         /// <summary>
         /// Create a producer.
         /// </summary>
         public IProducer<TMessage> CreateProducer<TMessage>(ProducerOptions<TMessage> options)
         {
             ThrowIfDisposed();
 
+            var partitionsCount = GetNumberOfPartitions(options.Topic, default).Result;

Review comment:
       Yeah .Result is a problem. For this particular case we could use GetAwaiter().GetResult() cause it raises exceptions and keeps the stack trace.
   Still I would consider either delaying that initialization to when we actually want to start to produce messages, or make the method truly async.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] blankensteiner merged pull request #71: Basic partitioned producer

Posted by GitBox <gi...@apache.org>.
blankensteiner merged pull request #71:
URL: https://github.com/apache/pulsar-dotpulsar/pull/71


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] blankensteiner commented on a change in pull request #71: Basic partitioned producer

Posted by GitBox <gi...@apache.org>.
blankensteiner commented on a change in pull request #71:
URL: https://github.com/apache/pulsar-dotpulsar/pull/71#discussion_r655371689



##########
File path: src/DotPulsar/SinglePartitionRouter.cs
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar
+{
+    using Abstractions;
+    using HashDepot;
+    using System;
+    using System.Text;
+
+    /// <summary>
+    /// If no key is provided, the producer will randomly pick one single partition and publish all the messages
+    /// into that partition. While if a key is specified on the message, the partitioned producer will hash the
+    /// key and assign message to a particular partition.
+    /// </summary>
+    public sealed class SinglePartitionRouter : IMessageRouter
+    {
+        private int? _partitionIndex;
+
+        internal SinglePartitionRouter(int? partitionIndex = null)
+        {
+            _partitionIndex = partitionIndex;
+        }
+
+        /// <summary>
+        /// Choose a partition in single partition routing mode
+        /// </summary>
+        public int ChoosePartition(MessageMetadata? messageMetadata, int partitionsCount)
+        {
+            if (messageMetadata != null && !string.IsNullOrEmpty(messageMetadata.Key))
+            {
+                return (int) MurmurHash3.Hash32(Encoding.UTF8.GetBytes(messageMetadata.Key ?? string.Empty), 0) % partitionsCount;
+            }
+
+            _partitionIndex ??= new Random().Next(0, partitionsCount);

Review comment:
       Shouldn't this re-initialize if the partitionsCount has changed?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] blankensteiner commented on a change in pull request #71: Basic partitioned producer

Posted by GitBox <gi...@apache.org>.
blankensteiner commented on a change in pull request #71:
URL: https://github.com/apache/pulsar-dotpulsar/pull/71#discussion_r655371689



##########
File path: src/DotPulsar/SinglePartitionRouter.cs
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar
+{
+    using Abstractions;
+    using HashDepot;
+    using System;
+    using System.Text;
+
+    /// <summary>
+    /// If no key is provided, the producer will randomly pick one single partition and publish all the messages
+    /// into that partition. While if a key is specified on the message, the partitioned producer will hash the
+    /// key and assign message to a particular partition.
+    /// </summary>
+    public sealed class SinglePartitionRouter : IMessageRouter
+    {
+        private int? _partitionIndex;
+
+        internal SinglePartitionRouter(int? partitionIndex = null)
+        {
+            _partitionIndex = partitionIndex;
+        }
+
+        /// <summary>
+        /// Choose a partition in single partition routing mode
+        /// </summary>
+        public int ChoosePartition(MessageMetadata? messageMetadata, int partitionsCount)
+        {
+            if (messageMetadata != null && !string.IsNullOrEmpty(messageMetadata.Key))
+            {
+                return (int) MurmurHash3.Hash32(Encoding.UTF8.GetBytes(messageMetadata.Key ?? string.Empty), 0) % partitionsCount;
+            }
+
+            _partitionIndex ??= new Random().Next(0, partitionsCount);

Review comment:
       Shouldn't this re-initialize if the partitionsCount has changed?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] blankensteiner commented on a change in pull request #71: Basic partitioned producer

Posted by GitBox <gi...@apache.org>.
blankensteiner commented on a change in pull request #71:
URL: https://github.com/apache/pulsar-dotpulsar/pull/71#discussion_r654686078



##########
File path: samples/Producing/Program.cs
##########
@@ -50,7 +50,7 @@ private static async Task Main()
         private static async Task ProduceMessages(IProducer<string> producer, CancellationToken cancellationToken)
         {
             var delay = TimeSpan.FromSeconds(5);
-
+            await producer.StateChangedTo(ProducerState.Connected, cancellationToken: cancellationToken).ConfigureAwait(false);

Review comment:
       This can be removed since you should never wait for a state before using the producer

##########
File path: src/DotPulsar/Internal/Producer.cs
##########
@@ -1,159 +1,122 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace DotPulsar.Internal
+namespace DotPulsar.Internal
 {
     using Abstractions;
     using DotPulsar.Abstractions;
-    using DotPulsar.Exceptions;
-    using DotPulsar.Internal.Extensions;
     using Events;
-    using Microsoft.Extensions.ObjectPool;
+    using Exceptions;
     using System;
-    using System.Buffers;
+    using System.Collections.Concurrent;
     using System.Threading;
     using System.Threading.Tasks;
 
-    public sealed class Producer<TMessage> : IEstablishNewChannel, IProducer<TMessage>
+    public sealed class Producer<TMessage> : IProducer<TMessage>
     {
-        private readonly ObjectPool<PulsarApi.MessageMetadata> _messageMetadataPool;
         private readonly Guid _correlationId;
         private readonly IRegisterEvent _eventRegister;
-        private IProducerChannel _channel;
         private readonly IExecute _executor;
         private readonly IStateChanged<ProducerState> _state;
-        private readonly IProducerChannelFactory _factory;
-        private readonly ISchema<TMessage> _schema;
-        private readonly SequenceId _sequenceId;
+        private readonly PulsarClient _pulsarClient;
+        private readonly ProducerOptions<TMessage> _options;
+        private readonly ConcurrentDictionary<int, IProducer<TMessage>> _producers;
+        private readonly IMessageRouter _messageRouter;
+        private readonly CancellationTokenSource _cts = new();
+        private int _producersCount;
         private int _isDisposed;
-
         public Uri ServiceUrl { get; }
         public string Topic { get; }
 
         public Producer(
             Guid correlationId,
             Uri serviceUrl,
             string topic,
-            ulong initialSequenceId,
             IRegisterEvent registerEvent,
-            IProducerChannel initialChannel,
             IExecute executor,
             IStateChanged<ProducerState> state,
-            IProducerChannelFactory factory,
-            ISchema<TMessage> schema)
+            ProducerOptions<TMessage> options,
+            PulsarClient pulsarClient
+        )
         {
-            var messageMetadataPolicy = new DefaultPooledObjectPolicy<PulsarApi.MessageMetadata>();
-            _messageMetadataPool = new DefaultObjectPool<PulsarApi.MessageMetadata>(messageMetadataPolicy);
             _correlationId = correlationId;
             ServiceUrl = serviceUrl;
             Topic = topic;
-            _sequenceId = new SequenceId(initialSequenceId);
             _eventRegister = registerEvent;
-            _channel = initialChannel;
             _executor = executor;
             _state = state;
-            _factory = factory;
-            _schema = schema;
             _isDisposed = 0;
+            _options = options;
+            _pulsarClient = pulsarClient;
+            _messageRouter = options.MessageRouter;
+
+            _producers = new ConcurrentDictionary<int, IProducer<TMessage>>(1, 31);
 
-            _eventRegister.Register(new ProducerCreated(_correlationId));
+            UpdatePartitions(_cts.Token);
         }
 
-        public async ValueTask<ProducerState> OnStateChangeTo(ProducerState state, CancellationToken cancellationToken)
-            => await _state.StateChangedTo(state, cancellationToken).ConfigureAwait(false);
+        private void CreateSubProducers(int startIndex, int count)
+        {
+            if (count == 0)
+            {
+                var producer = _pulsarClient.NewSubProducer(Topic, _options, _executor, _correlationId);
+                _producers[0] = producer;
+                return;
+            }
 
-        public async ValueTask<ProducerState> OnStateChangeFrom(ProducerState state, CancellationToken cancellationToken)
-            => await _state.StateChangedFrom(state, cancellationToken).ConfigureAwait(false);
+            for (var i = startIndex; i < count; ++i)
+            {
+                var producer = _pulsarClient.NewSubProducer(Topic, _options, _executor, _correlationId, (uint) i);
+                _producers[i] = producer;
+            }
+        }
+
+        private async void UpdatePartitions(CancellationToken cancellationToken)

Review comment:
       "async void" is a no-go, because it is a "fire and forget". I think an exception here will crash the application.
   We need to make this call part of the ongoing connect/reconnect feature (meaning the process/management).

##########
File path: tests/DotPulsar.Tests/Internal/PartitionedProducerProcessTests.cs
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Tests.Internal
+{
+    using Abstractions;
+    using DotPulsar.Internal;
+    using DotPulsar.Internal.Abstractions;
+    using DotPulsar.Internal.Events;
+    using NSubstitute;
+    using System;
+    using System.Collections.Concurrent;
+    using System.Collections.Generic;
+    using System.Threading.Tasks;
+    using Xunit;
+
+    public class PartitionedProducerProcessTests
+    {
+        [Fact]
+        public Task TestPartitionedProducerStateManage_WhenSubProducersStateChange_ThenPartitionedProducerStateChangeCorrectly()

Review comment:
       When not using async/await, you can just have the test return void.

##########
File path: src/DotPulsar/Internal/Producer.cs
##########
@@ -1,159 +1,122 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace DotPulsar.Internal
+namespace DotPulsar.Internal

Review comment:
       We need the apache license header here

##########
File path: tests/DotPulsar.Tests/Internal/PartitionedProducerProcessTests.cs
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Tests.Internal
+{
+    using Abstractions;
+    using DotPulsar.Internal;
+    using DotPulsar.Internal.Abstractions;
+    using DotPulsar.Internal.Events;
+    using NSubstitute;
+    using System;
+    using System.Collections.Concurrent;
+    using System.Collections.Generic;
+    using System.Threading.Tasks;
+    using Xunit;
+
+    public class PartitionedProducerProcessTests
+    {
+        [Fact]
+        public Task TestPartitionedProducerStateManage_WhenSubProducersStateChange_ThenPartitionedProducerStateChangeCorrectly()
+        {
+            var connectionPool = Substitute.For<IConnectionPool>();
+            var establishNewChannel = Substitute.For<IEstablishNewChannel>();
+            var producer = Substitute.For<IProducer>();
+
+            var processManager = new ProcessManager(connectionPool);
+
+            var producerGuids = new Dictionary<uint, Guid>(3);
+            var producersGroup = new ConcurrentDictionary<uint, IProducer>(Environment.ProcessorCount, 3);
+            var partitionedProducerGuid = Guid.NewGuid();
+
+            for (uint i = 0; i < 3; i++)
+            {
+                var stateManager = new StateManager<ProducerState>(ProducerState.Disconnected, ProducerState.Closed, ProducerState.Faulted);
+                var correlationId = Guid.NewGuid();
+                var process = new ProducerProcess(correlationId, stateManager, establishNewChannel, processManager, partitionedProducerGuid);
+                producerGuids[i] = correlationId;
+                producersGroup[i] = producer;
+                processManager.Add(process);
+            }
+
+            var partitionedStateManager =
+                new StateManager<ProducerState>(ProducerState.Disconnected, ProducerState.Closed, ProducerState.Faulted);
+
+            var producerProcess = new ProducerProcess(partitionedProducerGuid, partitionedStateManager, null, new ProcessManager(connectionPool));
+            processManager.Add(producerProcess);
+            processManager.Register(new UpdatePartitions(partitionedProducerGuid, (uint) producersGroup.Count));
+
+            // Test initial channel
+            processManager.Register(new ChannelDisconnected(producerGuids[0]));
+            Assert.Equal(ProducerState.Disconnected, partitionedStateManager.CurrentState);
+            processManager.Register(new ChannelDisconnected(producerGuids[1]));
+            Assert.Equal(ProducerState.Disconnected, partitionedStateManager.CurrentState);
+            processManager.Register(new ChannelDisconnected(producerGuids[2]));
+            Assert.Equal(ProducerState.Disconnected, partitionedStateManager.CurrentState);
+
+            // Test connect
+            Assert.Equal(ProducerState.Disconnected, partitionedStateManager.CurrentState);
+            processManager.Register(new ChannelConnected(producerGuids[0]));
+            Assert.Equal(ProducerState.PartiallyConnected, partitionedStateManager.CurrentState);
+            processManager.Register(new ChannelConnected(producerGuids[1]));
+            Assert.Equal(ProducerState.PartiallyConnected, partitionedStateManager.CurrentState);
+            processManager.Register(new ChannelConnected(producerGuids[2]));
+            Assert.Equal(ProducerState.Connected, partitionedStateManager.CurrentState);
+
+            // Test disconnect
+            processManager.Register(new ChannelDisconnected(producerGuids[1]));
+            Assert.Equal(ProducerState.PartiallyConnected, partitionedStateManager.CurrentState);
+
+            // Test reconnect
+            processManager.Register(new ChannelConnected(producerGuids[1]));
+            Assert.Equal(ProducerState.Connected, partitionedStateManager.CurrentState);
+
+            // Test fault
+            processManager.Register(new ExecutorFaulted(producerGuids[1]));
+            Assert.Equal(ProducerState.Faulted, partitionedStateManager.CurrentState);
+
+            return Task.CompletedTask;
+        }
+
+        [Fact]
+        public Task TestUpdatePartitions_WhenIncreasePartitions_ThenPartitionedProducerStateChangeCorrectly()

Review comment:
       When not using async/await, you can just have the test return void.

##########
File path: src/DotPulsar/Internal/ProducerProcess.cs
##########
@@ -15,52 +15,141 @@
 namespace DotPulsar.Internal
 {
     using Abstractions;
+    using Events;
     using System;
+    using System.Threading;
     using System.Threading.Tasks;
 
     public sealed class ProducerProcess : Process
     {
         private readonly IStateManager<ProducerState> _stateManager;
-        private readonly IEstablishNewChannel _producer;
+        private readonly IEstablishNewChannel? _producer;
+
+        // The following variables are only used when this is the process for parent producer.
+        private readonly IRegisterEvent _processManager;
+        private int _partitionsCount;
+        private int _connectedProducersCount;
+        private int _initialProducersCount;
+
+        // The following variables are only used for sub producer
+        private readonly Guid? _partitionedProducerId;
 
         public ProducerProcess(
             Guid correlationId,
             IStateManager<ProducerState> stateManager,
-            IEstablishNewChannel producer) : base(correlationId)
+            IEstablishNewChannel? producer,
+            IRegisterEvent processManager,
+            Guid? partitionedProducerId = null) : base(correlationId)
         {
             _stateManager = stateManager;
             _producer = producer;
+            _processManager = processManager;
+            _partitionedProducerId = partitionedProducerId;
         }
 
         public override async ValueTask DisposeAsync()
         {
-            _stateManager.SetState(ProducerState.Closed);
+            SetState(ProducerState.Closed);
             CancellationTokenSource.Cancel();
-            await _producer.DisposeAsync().ConfigureAwait(false);
+
+            if (_producer != null)
+                await _producer.DisposeAsync().ConfigureAwait(false);
+        }
+
+        protected override void HandleExtend(IEvent e)

Review comment:
       I think we can do this a bit more simply. The SubProducers have their own processes and own states, so we just need to monitor them (like the end-user does with the producer). When one of the SubProducers changes state, we just need to look at the "total states" and then, if needed, update the PartitionedProducer's state (the one the end-user is monitoring).
   The component monitoring the state changes of the SubProducers could also initially and periodically check what the total number of partitions is for the topic and therefore also create and delete the SubProducers.
   I'm probably not being very clear now, so should I have it a try?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] RobertIndie commented on a change in pull request #71: Basic partitioned producer

Posted by GitBox <gi...@apache.org>.
RobertIndie commented on a change in pull request #71:
URL: https://github.com/apache/pulsar-dotpulsar/pull/71#discussion_r655121619



##########
File path: src/DotPulsar/Internal/Producer.cs
##########
@@ -1,159 +1,122 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace DotPulsar.Internal
+namespace DotPulsar.Internal

Review comment:
       Sorry for the mistake. :joy:

##########
File path: src/DotPulsar/Internal/Producer.cs
##########
@@ -1,159 +1,122 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace DotPulsar.Internal
+namespace DotPulsar.Internal
 {
     using Abstractions;
     using DotPulsar.Abstractions;
-    using DotPulsar.Exceptions;
-    using DotPulsar.Internal.Extensions;
     using Events;
-    using Microsoft.Extensions.ObjectPool;
+    using Exceptions;
     using System;
-    using System.Buffers;
+    using System.Collections.Concurrent;
     using System.Threading;
     using System.Threading.Tasks;
 
-    public sealed class Producer<TMessage> : IEstablishNewChannel, IProducer<TMessage>
+    public sealed class Producer<TMessage> : IProducer<TMessage>
     {
-        private readonly ObjectPool<PulsarApi.MessageMetadata> _messageMetadataPool;
         private readonly Guid _correlationId;
         private readonly IRegisterEvent _eventRegister;
-        private IProducerChannel _channel;
         private readonly IExecute _executor;
         private readonly IStateChanged<ProducerState> _state;
-        private readonly IProducerChannelFactory _factory;
-        private readonly ISchema<TMessage> _schema;
-        private readonly SequenceId _sequenceId;
+        private readonly PulsarClient _pulsarClient;
+        private readonly ProducerOptions<TMessage> _options;
+        private readonly ConcurrentDictionary<int, IProducer<TMessage>> _producers;
+        private readonly IMessageRouter _messageRouter;
+        private readonly CancellationTokenSource _cts = new();
+        private int _producersCount;
         private int _isDisposed;
-
         public Uri ServiceUrl { get; }
         public string Topic { get; }
 
         public Producer(
             Guid correlationId,
             Uri serviceUrl,
             string topic,
-            ulong initialSequenceId,
             IRegisterEvent registerEvent,
-            IProducerChannel initialChannel,
             IExecute executor,
             IStateChanged<ProducerState> state,
-            IProducerChannelFactory factory,
-            ISchema<TMessage> schema)
+            ProducerOptions<TMessage> options,
+            PulsarClient pulsarClient
+        )
         {
-            var messageMetadataPolicy = new DefaultPooledObjectPolicy<PulsarApi.MessageMetadata>();
-            _messageMetadataPool = new DefaultObjectPool<PulsarApi.MessageMetadata>(messageMetadataPolicy);
             _correlationId = correlationId;
             ServiceUrl = serviceUrl;
             Topic = topic;
-            _sequenceId = new SequenceId(initialSequenceId);
             _eventRegister = registerEvent;
-            _channel = initialChannel;
             _executor = executor;
             _state = state;
-            _factory = factory;
-            _schema = schema;
             _isDisposed = 0;
+            _options = options;
+            _pulsarClient = pulsarClient;
+            _messageRouter = options.MessageRouter;
+
+            _producers = new ConcurrentDictionary<int, IProducer<TMessage>>(1, 31);
 
-            _eventRegister.Register(new ProducerCreated(_correlationId));
+            UpdatePartitions(_cts.Token);
         }
 
-        public async ValueTask<ProducerState> OnStateChangeTo(ProducerState state, CancellationToken cancellationToken)
-            => await _state.StateChangedTo(state, cancellationToken).ConfigureAwait(false);
+        private void CreateSubProducers(int startIndex, int count)
+        {
+            if (count == 0)
+            {
+                var producer = _pulsarClient.NewSubProducer(Topic, _options, _executor, _correlationId);
+                _producers[0] = producer;
+                return;
+            }
 
-        public async ValueTask<ProducerState> OnStateChangeFrom(ProducerState state, CancellationToken cancellationToken)
-            => await _state.StateChangedFrom(state, cancellationToken).ConfigureAwait(false);
+            for (var i = startIndex; i < count; ++i)
+            {
+                var producer = _pulsarClient.NewSubProducer(Topic, _options, _executor, _correlationId, (uint) i);
+                _producers[i] = producer;
+            }
+        }
+
+        private async void UpdatePartitions(CancellationToken cancellationToken)

Review comment:
       I have moved it to `EstablishNewChannel `. However, there are two types of logic in `EstablishNewChannel` currently: `Create new channel` and `Update partitions`. Can we rename `EstablishNewChannel` to `InitialConnection`?

##########
File path: src/DotPulsar/Internal/ProducerProcess.cs
##########
@@ -15,52 +15,141 @@
 namespace DotPulsar.Internal
 {
     using Abstractions;
+    using Events;
     using System;
+    using System.Threading;
     using System.Threading.Tasks;
 
     public sealed class ProducerProcess : Process
     {
         private readonly IStateManager<ProducerState> _stateManager;
-        private readonly IEstablishNewChannel _producer;
+        private readonly IEstablishNewChannel? _producer;
+
+        // The following variables are only used when this is the process for parent producer.
+        private readonly IRegisterEvent _processManager;
+        private int _partitionsCount;
+        private int _connectedProducersCount;
+        private int _initialProducersCount;
+
+        // The following variables are only used for sub producer
+        private readonly Guid? _partitionedProducerId;
 
         public ProducerProcess(
             Guid correlationId,
             IStateManager<ProducerState> stateManager,
-            IEstablishNewChannel producer) : base(correlationId)
+            IEstablishNewChannel? producer,
+            IRegisterEvent processManager,
+            Guid? partitionedProducerId = null) : base(correlationId)
         {
             _stateManager = stateManager;
             _producer = producer;
+            _processManager = processManager;
+            _partitionedProducerId = partitionedProducerId;
         }
 
         public override async ValueTask DisposeAsync()
         {
-            _stateManager.SetState(ProducerState.Closed);
+            SetState(ProducerState.Closed);
             CancellationTokenSource.Cancel();
-            await _producer.DisposeAsync().ConfigureAwait(false);
+
+            if (_producer != null)
+                await _producer.DisposeAsync().ConfigureAwait(false);
+        }
+
+        protected override void HandleExtend(IEvent e)

Review comment:
       What confuses me is the UpdatePartitions part. I think Producer class need to be responsible for sub producers creation. The producer process does not create or delete sub producers. If we want to implement in this way, we may need to share the collection of sub producers between Producer and ParoducerProcess. But it seems more complex.

##########
File path: src/DotPulsar/SinglePartitionRouter.cs
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar
+{
+    using Abstractions;
+    using HashDepot;
+    using System;
+    using System.Text;
+
+    /// <summary>
+    /// If no key is provided, the producer will randomly pick one single partition and publish all the messages
+    /// into that partition. While if a key is specified on the message, the partitioned producer will hash the
+    /// key and assign message to a particular partition.
+    /// </summary>
+    public sealed class SinglePartitionRouter : IMessageRouter
+    {
+        private int? _partitionIndex;
+
+        internal SinglePartitionRouter(int? partitionIndex = null)
+        {
+            _partitionIndex = partitionIndex;
+        }
+
+        /// <summary>
+        /// Choose a partition in single partition routing mode
+        /// </summary>
+        public int ChoosePartition(MessageMetadata? messageMetadata, int partitionsCount)
+        {
+            if (messageMetadata != null && !string.IsNullOrEmpty(messageMetadata.Key))
+            {
+                return (int) MurmurHash3.Hash32(Encoding.UTF8.GetBytes(messageMetadata.Key ?? string.Empty), 0) % partitionsCount;
+            }
+
+            _partitionIndex ??= new Random().Next(0, partitionsCount);

Review comment:
       This implementation is the same as the java client and it makes sense. SinglePartitionRouter simply selects a random partition when creating the producer, and all subsequent messages(without key assigned) are sent to that partition. Currently, pulsar does not support reducing partitions. The selected partition should not be changed when partition expansion occurs




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] RobertIndie commented on a change in pull request #71: Basic partitioned producer

Posted by GitBox <gi...@apache.org>.
RobertIndie commented on a change in pull request #71:
URL: https://github.com/apache/pulsar-dotpulsar/pull/71#discussion_r655121619



##########
File path: src/DotPulsar/Internal/Producer.cs
##########
@@ -1,159 +1,122 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace DotPulsar.Internal
+namespace DotPulsar.Internal

Review comment:
       Sorry for the mistake. :joy:




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] RobertIndie commented on a change in pull request #71: Basic partitioned producer

Posted by GitBox <gi...@apache.org>.
RobertIndie commented on a change in pull request #71:
URL: https://github.com/apache/pulsar-dotpulsar/pull/71#discussion_r655154397



##########
File path: src/DotPulsar/Internal/ProducerProcess.cs
##########
@@ -15,52 +15,141 @@
 namespace DotPulsar.Internal
 {
     using Abstractions;
+    using Events;
     using System;
+    using System.Threading;
     using System.Threading.Tasks;
 
     public sealed class ProducerProcess : Process
     {
         private readonly IStateManager<ProducerState> _stateManager;
-        private readonly IEstablishNewChannel _producer;
+        private readonly IEstablishNewChannel? _producer;
+
+        // The following variables are only used when this is the process for parent producer.
+        private readonly IRegisterEvent _processManager;
+        private int _partitionsCount;
+        private int _connectedProducersCount;
+        private int _initialProducersCount;
+
+        // The following variables are only used for sub producer
+        private readonly Guid? _partitionedProducerId;
 
         public ProducerProcess(
             Guid correlationId,
             IStateManager<ProducerState> stateManager,
-            IEstablishNewChannel producer) : base(correlationId)
+            IEstablishNewChannel? producer,
+            IRegisterEvent processManager,
+            Guid? partitionedProducerId = null) : base(correlationId)
         {
             _stateManager = stateManager;
             _producer = producer;
+            _processManager = processManager;
+            _partitionedProducerId = partitionedProducerId;
         }
 
         public override async ValueTask DisposeAsync()
         {
-            _stateManager.SetState(ProducerState.Closed);
+            SetState(ProducerState.Closed);
             CancellationTokenSource.Cancel();
-            await _producer.DisposeAsync().ConfigureAwait(false);
+
+            if (_producer != null)
+                await _producer.DisposeAsync().ConfigureAwait(false);
+        }
+
+        protected override void HandleExtend(IEvent e)

Review comment:
       What confuses me is the UpdatePartitions part. I think Producer class need to be responsible for sub producers creation. The producer process does not create or delete sub producers. If we want to implement in this way, we may need to share the collection of sub producers between Producer and ParoducerProcess. But it seems more complex.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] RobertIndie commented on a change in pull request #71: Basic partitioned producer

Posted by GitBox <gi...@apache.org>.
RobertIndie commented on a change in pull request #71:
URL: https://github.com/apache/pulsar-dotpulsar/pull/71#discussion_r637383831



##########
File path: src/DotPulsar/PulsarClient.cs
##########
@@ -57,26 +58,84 @@ public sealed class PulsarClient : IPulsarClient
         public static IPulsarClientBuilder Builder()
             => new PulsarClientBuilder();
 
+        public async Task<uint> GetNumberOfPartitions(string topic, CancellationToken cancellationToken)
+        {
+            var connection = await _connectionPool.FindConnectionForTopic(topic, cancellationToken).ConfigureAwait(false);
+            var commandPartitionedMetadata = new CommandPartitionedTopicMetadata() { Topic = topic };
+            var response = await connection.Send(commandPartitionedMetadata, cancellationToken).ConfigureAwait(false);
+
+            response.Expect(BaseCommand.Type.PartitionedMetadataResponse);
+
+            if (response.PartitionMetadataResponse.Response == CommandPartitionedTopicMetadataResponse.LookupType.Failed)
+                response.PartitionMetadataResponse.Throw();
+
+            return response.PartitionMetadataResponse.Partitions;
+        }
+
         /// <summary>
         /// Create a producer.
         /// </summary>
         public IProducer<TMessage> CreateProducer<TMessage>(ProducerOptions<TMessage> options)
         {
             ThrowIfDisposed();
 
+            var partitionsCount = GetNumberOfPartitions(options.Topic, default).Result;

Review comment:
       Thanks for your suggestion. 
   Each time a Producer is created, the partitions count needs to be obtained first to determine whether it is a partitioned producer, so I think it is necessary to get the result of the partitions counts here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] blankensteiner merged pull request #71: Basic partitioned producer

Posted by GitBox <gi...@apache.org>.
blankensteiner merged pull request #71:
URL: https://github.com/apache/pulsar-dotpulsar/pull/71


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] blankensteiner commented on pull request #71: Basic partitioned producer

Posted by GitBox <gi...@apache.org>.
blankensteiner commented on pull request #71:
URL: https://github.com/apache/pulsar-dotpulsar/pull/71#issuecomment-863779058


   @jiazhai Sure, I'll do that today :-)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org