You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bl...@apache.org on 2021/06/26 06:23:46 UTC
[pulsar-dotpulsar] branch master updated: Refactoring partitioned
producer. Still work in progress (need more resilience and testing)
This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 28150f3 Refactoring partitioned producer. Still work in progress (need more resilience and testing)
28150f3 is described below
commit 28150f351a2ec65d5edd3b2483acc123ac608d4d
Author: Daniel Blankensteiner <db...@vmail.dk>
AuthorDate: Sat Jun 26 08:23:38 2021 +0200
Refactoring partitioned producer. Still work in progress (need more resilience and testing)
---
src/DotPulsar/Internal/Abstractions/Process.cs | 6 -
src/DotPulsar/Internal/DefaultExceptionHandler.cs | 1 -
.../Events/PartitionedSubProducerStateChanged.cs | 34 ----
src/DotPulsar/Internal/Events/UpdatePartitions.cs | 35 ----
.../Internal/Exceptions/LookupNotReadyException.cs | 23 ---
src/DotPulsar/Internal/Producer.cs | 180 ++++++++++++++-------
src/DotPulsar/Internal/ProducerProcess.cs | 97 +----------
src/DotPulsar/PulsarClient.cs | 59 +------
.../Internal/PartitionedProducerProcessTests.cs | 110 -------------
9 files changed, 132 insertions(+), 413 deletions(-)
diff --git a/src/DotPulsar/Internal/Abstractions/Process.cs b/src/DotPulsar/Internal/Abstractions/Process.cs
index 4c0d3d8..056471a 100644
--- a/src/DotPulsar/Internal/Abstractions/Process.cs
+++ b/src/DotPulsar/Internal/Abstractions/Process.cs
@@ -68,17 +68,11 @@ namespace DotPulsar.Internal.Abstractions
case ChannelUnsubscribed _:
ChannelState = ChannelState.Unsubscribed;
break;
- default:
- HandleExtend(e);
- break;
}
CalculateState();
}
protected abstract void CalculateState();
-
- protected virtual void HandleExtend(IEvent e)
- => throw new NotImplementedException();
}
}
diff --git a/src/DotPulsar/Internal/DefaultExceptionHandler.cs b/src/DotPulsar/Internal/DefaultExceptionHandler.cs
index f29d269..a783827 100644
--- a/src/DotPulsar/Internal/DefaultExceptionHandler.cs
+++ b/src/DotPulsar/Internal/DefaultExceptionHandler.cs
@@ -51,7 +51,6 @@ namespace DotPulsar.Internal
AsyncLockDisposedException _ => FaultAction.Retry,
PulsarStreamDisposedException _ => FaultAction.Retry,
AsyncQueueDisposedException _ => FaultAction.Retry,
- LookupNotReadyException _ => FaultAction.Retry,
OperationCanceledException _ => cancellationToken.IsCancellationRequested ? FaultAction.Rethrow : FaultAction.Retry,
DotPulsarException _ => FaultAction.Rethrow,
SocketException socketException => socketException.SocketErrorCode switch
diff --git a/src/DotPulsar/Internal/Events/PartitionedSubProducerStateChanged.cs b/src/DotPulsar/Internal/Events/PartitionedSubProducerStateChanged.cs
deleted file mode 100644
index 326641d..0000000
--- a/src/DotPulsar/Internal/Events/PartitionedSubProducerStateChanged.cs
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace DotPulsar.Internal.Events
-{
- using Abstractions;
- using System;
-
- /// <summary>
- /// Representation of the sub producer of a partitioned producer state change.
- /// </summary>
- public sealed class PartitionedSubProducerStateChanged : IEvent
- {
- public Guid CorrelationId { get; }
- public ProducerState ProducerState { get; }
-
- public PartitionedSubProducerStateChanged(Guid correlationId, ProducerState producerState)
- {
- CorrelationId = correlationId;
- ProducerState = producerState;
- }
- }
-}
diff --git a/src/DotPulsar/Internal/Events/UpdatePartitions.cs b/src/DotPulsar/Internal/Events/UpdatePartitions.cs
deleted file mode 100644
index b9894b5..0000000
--- a/src/DotPulsar/Internal/Events/UpdatePartitions.cs
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace DotPulsar.Internal.Events
-{
- using Abstractions;
- using System;
-
- /// <summary>
- /// Representation of the partitions count of the partitioned topic updating.
- /// </summary>
- public sealed class UpdatePartitions : IEvent
- {
- public Guid CorrelationId { get; }
-
- public uint PartitionsCount { get; }
-
- public UpdatePartitions(Guid correlationId, uint partitionsCount)
- {
- CorrelationId = correlationId;
- PartitionsCount = partitionsCount;
- }
- }
-}
diff --git a/src/DotPulsar/Internal/Exceptions/LookupNotReadyException.cs b/src/DotPulsar/Internal/Exceptions/LookupNotReadyException.cs
deleted file mode 100644
index 13fc562..0000000
--- a/src/DotPulsar/Internal/Exceptions/LookupNotReadyException.cs
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace DotPulsar.Internal.Exceptions
-{
- using DotPulsar.Exceptions;
-
- public sealed class LookupNotReadyException : DotPulsarException
- {
- public LookupNotReadyException() : base("The topic lookup operation is not ready yet") { }
- }
-}
diff --git a/src/DotPulsar/Internal/Producer.cs b/src/DotPulsar/Internal/Producer.cs
index d3c3495..7bce3be 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -16,76 +16,150 @@ namespace DotPulsar.Internal
{
using Abstractions;
using DotPulsar.Abstractions;
- using Events;
- using Exceptions;
+ using DotPulsar.Extensions;
+ using DotPulsar.Internal.Extensions;
+ using DotPulsar.Internal.PulsarApi;
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
- public sealed class Producer<TMessage> : IEstablishNewChannel, IProducer<TMessage>
+ public sealed class Producer<TMessage> : IProducer<TMessage>
{
- private readonly Guid _correlationId;
- private readonly IRegisterEvent _eventRegister;
- private readonly IExecute _executor;
- private readonly IStateChanged<ProducerState> _state;
- private readonly PulsarClient _pulsarClient;
+ private readonly StateManager<ProducerState> _state;
+ private readonly IConnectionPool _connectionPool;
+ private readonly IHandleException _exceptionHandler;
+ private readonly ICompressorFactory? _compressorFactory;
private readonly ProducerOptions<TMessage> _options;
+ private readonly ProcessManager _processManager;
private readonly ConcurrentDictionary<int, IProducer<TMessage>> _producers;
private readonly IMessageRouter _messageRouter;
- private readonly CancellationTokenSource _cts = new();
- private int _producersCount;
+ private readonly CancellationTokenSource _cts;
private int _isDisposed;
+ private int _producerCount;
+
public Uri ServiceUrl { get; }
public string Topic { get; }
public Producer(
- Guid correlationId,
Uri serviceUrl,
- string topic,
- IRegisterEvent registerEvent,
- IExecute executor,
- IStateChanged<ProducerState> state,
ProducerOptions<TMessage> options,
- PulsarClient pulsarClient
- )
+ ProcessManager processManager,
+ IHandleException exceptionHandler,
+ IConnectionPool connectionPool,
+ ICompressorFactory? compressorFactory)
{
- _correlationId = correlationId;
+ _state = new StateManager<ProducerState>(ProducerState.Disconnected, ProducerState.Closed, ProducerState.Faulted);
ServiceUrl = serviceUrl;
- Topic = topic;
- _eventRegister = registerEvent;
- _executor = executor;
- _state = state;
+ Topic = options.Topic;
_isDisposed = 0;
_options = options;
- _pulsarClient = pulsarClient;
+ _exceptionHandler = exceptionHandler;
+ _connectionPool = connectionPool;
+ _compressorFactory = compressorFactory;
+ _processManager = processManager;
_messageRouter = options.MessageRouter;
-
- _producers = new ConcurrentDictionary<int, IProducer<TMessage>>(1, 31);
+ _cts = new CancellationTokenSource();
+ _producers = new ConcurrentDictionary<int, IProducer<TMessage>>();
+ _ = Monitor();
}
- private void CreateSubProducers(int startIndex, int count)
+ private async Task Monitor()
{
- if (count == 0)
+ await Task.Yield();
+
+ try
{
- var producer = _pulsarClient.NewSubProducer(Topic, _options, _executor, _correlationId);
- _producers[0] = producer;
- return;
+ var numberOfPartitions = await GetNumberOfPartitions(Topic, _cts.Token).ConfigureAwait(false);
+ var isPartitionedTopic = numberOfPartitions != 0;
+ var monitoringTasks = new Task<ProducerStateChanged>[isPartitionedTopic ? numberOfPartitions : 1];
+
+ var topic = Topic;
+
+ for (var partition = 0; partition < numberOfPartitions; ++partition)
+ {
+ if (isPartitionedTopic)
+ topic = $"{Topic}-partition-{partition}";
+
+ var producer = CreateSubProducer(topic);
+ _ = _producers.TryAdd(partition, producer);
+ monitoringTasks[partition] = producer.StateChangedFrom(ProducerState.Disconnected, _cts.Token).AsTask();
+ }
+
+ Interlocked.Exchange(ref _producerCount, monitoringTasks.Length);
+
+ var connectedProducers = 0;
+
+ while (true)
+ {
+ await Task.WhenAny(monitoringTasks).ConfigureAwait(false);
+
+ for (var i = 0; i < monitoringTasks.Length; ++i)
+ {
+ var task = monitoringTasks[i];
+ if (!task.IsCompleted)
+ continue;
+
+ var state = task.Result.ProducerState;
+ switch (state)
+ {
+ case ProducerState.Connected:
+ ++connectedProducers;
+ break;
+ case ProducerState.Disconnected:
+ --connectedProducers;
+ break;
+ case ProducerState.Faulted:
+ throw new Exception("SubProducer faulted");
+ }
+
+ monitoringTasks[i] = task.Result.Producer.StateChangedFrom(state, _cts.Token).AsTask();
+ }
+
+ if (connectedProducers == 0)
+ _state.SetState(ProducerState.Disconnected);
+ else if (connectedProducers == numberOfPartitions)
+ _state.SetState(ProducerState.Connected);
+ else
+ _state.SetState(ProducerState.PartiallyConnected);
+ }
}
-
- for (var i = startIndex; i < count; ++i)
+ catch
{
- var producer = _pulsarClient.NewSubProducer(Topic, _options, _executor, _correlationId, (uint) i);
- _producers[i] = producer;
+ if (!_cts.IsCancellationRequested)
+ _state.SetState(ProducerState.Faulted);
}
}
- private async void UpdatePartitions(CancellationToken cancellationToken)
+ private SubProducer<TMessage> CreateSubProducer(string topic)
{
- var partitionsCount = (int) await _pulsarClient.GetNumberOfPartitions(Topic, cancellationToken).ConfigureAwait(false);
- _eventRegister.Register(new UpdatePartitions(_correlationId, (uint)partitionsCount));
- CreateSubProducers(_producers.Count, partitionsCount);
- _producersCount = partitionsCount;
+ var correlationId = Guid.NewGuid();
+ var executor = new Executor(correlationId, _processManager, _exceptionHandler);
+ var producerName = _options.ProducerName;
+ var schema = _options.Schema;
+ var initialSequenceId = _options.InitialSequenceId;
+ var factory = new ProducerChannelFactory(correlationId, _processManager, _connectionPool, topic, producerName, schema.SchemaInfo, _compressorFactory);
+ var stateManager = new StateManager<ProducerState>(ProducerState.Disconnected, ProducerState.Closed, ProducerState.Faulted);
+ var initialChannel = new NotReadyChannel<TMessage>();
+ var producer = new SubProducer<TMessage>(correlationId, ServiceUrl, topic, initialSequenceId, _processManager, initialChannel, executor, stateManager, factory, schema);
+ var process = new ProducerProcess(correlationId, stateManager, producer);
+ _processManager.Add(process);
+ process.Start();
+ return producer;
+ }
+
+ private async Task<uint> GetNumberOfPartitions(string topic, CancellationToken cancellationToken)
+ {
+ var connection = await _connectionPool.FindConnectionForTopic(topic, cancellationToken).ConfigureAwait(false);
+ var commandPartitionedMetadata = new CommandPartitionedTopicMetadata { Topic = topic };
+ var response = await connection.Send(commandPartitionedMetadata, cancellationToken).ConfigureAwait(false);
+
+ response.Expect(BaseCommand.Type.PartitionedMetadataResponse);
+
+ if (response.PartitionMetadataResponse.Response == CommandPartitionedTopicMetadataResponse.LookupType.Failed)
+ response.PartitionMetadataResponse.Throw();
+
+ return response.PartitionMetadataResponse.Partitions;
}
public bool IsFinalState()
@@ -94,10 +168,10 @@ namespace DotPulsar.Internal
public bool IsFinalState(ProducerState state)
=> _state.IsFinalState(state);
- public async ValueTask<ProducerState> OnStateChangeTo(ProducerState state, CancellationToken cancellationToken = default)
+ public async ValueTask<ProducerState> OnStateChangeTo(ProducerState state, CancellationToken cancellationToken)
=> await _state.StateChangedTo(state, cancellationToken).ConfigureAwait(false);
- public async ValueTask<ProducerState> OnStateChangeFrom(ProducerState state, CancellationToken cancellationToken = default)
+ public async ValueTask<ProducerState> OnStateChangeFrom(ProducerState state, CancellationToken cancellationToken)
=> await _state.StateChangedFrom(state, cancellationToken).ConfigureAwait(false);
public async ValueTask DisposeAsync()
@@ -113,27 +187,21 @@ namespace DotPulsar.Internal
await producer.DisposeAsync().ConfigureAwait(false);
}
- _eventRegister.Register(new ProducerDisposed(_correlationId));
+ _state.SetState(ProducerState.Closed);
}
- public async Task EstablishNewChannel(CancellationToken cancellationToken)
+ private async ValueTask<int> ChoosePartitions(DotPulsar.MessageMetadata? metadata, CancellationToken cancellationToken)
{
- await _executor.Execute(() => UpdatePartitions(cancellationToken), cancellationToken).ConfigureAwait(false);
- }
+ if (_producerCount == 0)
+ await _state.StateChangedFrom(ProducerState.Disconnected, cancellationToken).ConfigureAwait(false);
- private int ChoosePartitions(MessageMetadata? metadata)
- {
- if (_producers.IsEmpty)
- {
- throw new LookupNotReadyException();
- }
- return _producersCount == 0 ? 0 : _messageRouter.ChoosePartition(metadata, _producersCount);
+ return _messageRouter.ChoosePartition(metadata, _producerCount);
}
- public async ValueTask<MessageId> Send(TMessage message, CancellationToken cancellationToken = default)
- => await _executor.Execute(() => _producers[ChoosePartitions(null)].Send(message, cancellationToken), cancellationToken).ConfigureAwait(false);
+ public async ValueTask<MessageId> Send(TMessage message, CancellationToken cancellationToken)
+ => await _producers[await ChoosePartitions(null, cancellationToken).ConfigureAwait(false)].Send(message, cancellationToken).ConfigureAwait(false);
- public async ValueTask<MessageId> Send(MessageMetadata metadata, TMessage message, CancellationToken cancellationToken = default)
- => await _executor.Execute(() => _producers[ChoosePartitions(metadata)].Send(message, cancellationToken), cancellationToken).ConfigureAwait(false);
+ public async ValueTask<MessageId> Send(DotPulsar.MessageMetadata metadata, TMessage message, CancellationToken cancellationToken)
+ => await _producers[await ChoosePartitions(metadata, cancellationToken).ConfigureAwait(false)].Send(message, cancellationToken).ConfigureAwait(false);
}
}
diff --git a/src/DotPulsar/Internal/ProducerProcess.cs b/src/DotPulsar/Internal/ProducerProcess.cs
index 36834df..7e0b37f 100644
--- a/src/DotPulsar/Internal/ProducerProcess.cs
+++ b/src/DotPulsar/Internal/ProducerProcess.cs
@@ -15,9 +15,7 @@
namespace DotPulsar.Internal
{
using Abstractions;
- using Events;
using System;
- using System.Threading;
using System.Threading.Tasks;
public sealed class ProducerProcess : Process
@@ -25,101 +23,30 @@ namespace DotPulsar.Internal
private readonly IStateManager<ProducerState> _stateManager;
private readonly IEstablishNewChannel _producer;
- // The following variables are only used when this is the process for parent producer.
- private readonly IRegisterEvent _processManager;
- private int _partitionsCount;
- private int _connectedProducersCount;
- private int _initialProducersCount;
-
- // The following variables are only used for sub producer
- private readonly Guid? _partitionedProducerId;
-
public ProducerProcess(
Guid correlationId,
IStateManager<ProducerState> stateManager,
- IEstablishNewChannel producer,
- IRegisterEvent processManager,
- Guid? partitionedProducerId = null) : base(correlationId)
+ IEstablishNewChannel producer) : base(correlationId)
{
_stateManager = stateManager;
_producer = producer;
- _processManager = processManager;
- _partitionedProducerId = partitionedProducerId;
}
public override async ValueTask DisposeAsync()
{
- SetState(ProducerState.Closed);
+ _stateManager.SetState(ProducerState.Closed);
CancellationTokenSource.Cancel();
-
await _producer.DisposeAsync().ConfigureAwait(false);
}
- protected override void HandleExtend(IEvent e)
- {
- switch (e)
- {
- case PartitionedSubProducerStateChanged stateChanged:
- switch (stateChanged.ProducerState)
- {
- case ProducerState.Closed:
- _stateManager.SetState(ProducerState.Closed);
- break;
- case ProducerState.Connected:
- Interlocked.Increment(ref _connectedProducersCount);
- break;
- case ProducerState.Disconnected:
- // When the sub producer is initialized, the Disconnected event will be triggered.
- // So we need to first subtract from _initialProducersCount.
- if (_initialProducersCount == 0)
- Interlocked.Decrement(ref _connectedProducersCount);
- else
- Interlocked.Decrement(ref _initialProducersCount);
- break;
- case ProducerState.Faulted:
- _stateManager.SetState(ProducerState.Faulted);
- break;
- }
-
- break;
- case UpdatePartitions updatePartitions:
- if (updatePartitions.PartitionsCount == 0)
- {
- Interlocked.Exchange(ref _initialProducersCount, 1);
- Interlocked.Exchange(ref _partitionsCount, 1);
- }
- else
- {
- Interlocked.Add(ref _initialProducersCount, (int) updatePartitions.PartitionsCount - _partitionsCount);
- Interlocked.Exchange(ref _partitionsCount, (int) updatePartitions.PartitionsCount);
- }
-
- break;
- }
- }
-
protected override void CalculateState()
{
if (_stateManager.IsFinalState())
return;
- if (!IsSubProducer()) // parent producer process
- {
- if (_connectedProducersCount <= 0)
- _stateManager.SetState(ProducerState.Disconnected);
- else if (_connectedProducersCount == _partitionsCount)
- _stateManager.SetState(ProducerState.Connected);
- else
- _stateManager.SetState(ProducerState.PartiallyConnected);
-
- if (_partitionsCount == 0)
- _ = _producer.EstablishNewChannel(CancellationTokenSource.Token);
- return;
- }
-
if (ExecutorState == ExecutorState.Faulted)
{
- SetState(ProducerState.Faulted);
+ _stateManager.SetState(ProducerState.Faulted);
return;
}
@@ -127,27 +54,13 @@ namespace DotPulsar.Internal
{
case ChannelState.ClosedByServer:
case ChannelState.Disconnected:
- SetState(ProducerState.Disconnected);
+ _stateManager.SetState(ProducerState.Disconnected);
_ = _producer.EstablishNewChannel(CancellationTokenSource.Token);
return;
case ChannelState.Connected:
- SetState(ProducerState.Connected);
+ _stateManager.SetState(ProducerState.Connected);
return;
}
}
-
- /// <summary>
- /// Check if this is the sub producer process of the partitioned producer.
- /// </summary>
- private bool IsSubProducer()
- => _partitionedProducerId.HasValue;
-
- private void SetState(ProducerState state)
- {
- _stateManager.SetState(state);
-
- if (IsSubProducer())
- _processManager.Register(new PartitionedSubProducerStateChanged(_partitionedProducerId!.Value, state));
- }
}
}
diff --git a/src/DotPulsar/PulsarClient.cs b/src/DotPulsar/PulsarClient.cs
index 453ba26..167bed8 100644
--- a/src/DotPulsar/PulsarClient.cs
+++ b/src/DotPulsar/PulsarClient.cs
@@ -20,7 +20,6 @@ namespace DotPulsar
using Exceptions;
using Internal;
using Internal.Abstractions;
- using Internal.Extensions;
using System;
using System.Linq;
using System.Threading;
@@ -58,20 +57,6 @@ namespace DotPulsar
public static IPulsarClientBuilder Builder()
=> new PulsarClientBuilder();
- internal async Task<uint> GetNumberOfPartitions(string topic, CancellationToken cancellationToken)
- {
- var connection = await _connectionPool.FindConnectionForTopic(topic, cancellationToken).ConfigureAwait(false);
- var commandPartitionedMetadata = new CommandPartitionedTopicMetadata() { Topic = topic };
- var response = await connection.Send(commandPartitionedMetadata, cancellationToken).ConfigureAwait(false);
-
- response.Expect(BaseCommand.Type.PartitionedMetadataResponse);
-
- if (response.PartitionMetadataResponse.Response == CommandPartitionedTopicMetadataResponse.LookupType.Failed)
- response.PartitionMetadataResponse.Throw();
-
- return response.PartitionMetadataResponse.Partitions;
- }
-
/// <summary>
/// Create a producer.
/// </summary>
@@ -79,34 +64,7 @@ namespace DotPulsar
{
ThrowIfDisposed();
- var correlationId = Guid.NewGuid();
- var executor = new Executor(correlationId, _processManager, _exceptionHandler);
- var stateManager = new StateManager<ProducerState>(ProducerState.Disconnected, ProducerState.Closed, ProducerState.Faulted);
-
- var producer = new Producer<TMessage>(correlationId, ServiceUrl, options.Topic, _processManager, executor, stateManager, options, this);
-
- if (options.StateChangedHandler is not null)
- _ = StateMonitor.MonitorProducer(producer, options.StateChangedHandler);
- var process = new ProducerProcess(correlationId, stateManager, producer, _processManager);
- _processManager.Add(process);
- process.Start();
- return producer;
- }
-
- /// <summary>
- /// Create a producer internally.
- /// This method is used to create internal producers for partitioned producer.
- /// </summary>
- internal SubProducer<TMessage> NewSubProducer<TMessage>(string topic, ProducerOptions<TMessage> options, IExecute executor, Guid partitionedProducerGuid,
- uint? partitionIndex = null)
- {
- ThrowIfDisposed();
-
ICompressorFactory? compressorFactory = null;
-
- if (partitionIndex.HasValue)
- topic = $"{topic}-partition-{partitionIndex}";
-
if (options.CompressionType != CompressionType.None)
{
var compressionType = (Internal.PulsarApi.CompressionType) options.CompressionType;
@@ -116,22 +74,11 @@ namespace DotPulsar
throw new CompressionException($"Support for {compressionType} compression was not found");
}
- var correlationId = Guid.NewGuid();
-
- var producerName = options.ProducerName;
- var schema = options.Schema;
- var initialSequenceId = options.InitialSequenceId;
-
- var factory = new ProducerChannelFactory(correlationId, _processManager, _connectionPool, topic, producerName, schema.SchemaInfo, compressorFactory);
- var stateManager = new StateManager<ProducerState>(ProducerState.Disconnected, ProducerState.Closed, ProducerState.Faulted);
- var initialChannel = new NotReadyChannel<TMessage>();
- var producer = new SubProducer<TMessage>(correlationId, ServiceUrl, topic, initialSequenceId, _processManager, initialChannel, executor, stateManager, factory, schema);
+ var producer = new Producer<TMessage>(ServiceUrl, options, _processManager, _exceptionHandler, _connectionPool, compressorFactory);
- if (options.StateChangedHandler is not null && !partitionIndex.HasValue) // the StateChangeHandler of the sub producers in partitioned producers should be disabled.
+ if (options.StateChangedHandler is not null)
_ = StateMonitor.MonitorProducer(producer, options.StateChangedHandler);
- var process = new ProducerProcess(correlationId, stateManager, producer, _processManager, partitionedProducerGuid);
- _processManager.Add(process);
- process.Start();
+
return producer;
}
diff --git a/tests/DotPulsar.Tests/Internal/PartitionedProducerProcessTests.cs b/tests/DotPulsar.Tests/Internal/PartitionedProducerProcessTests.cs
deleted file mode 100644
index 476a81a..0000000
--- a/tests/DotPulsar.Tests/Internal/PartitionedProducerProcessTests.cs
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace DotPulsar.Tests.Internal
-{
- using Abstractions;
- using DotPulsar.Internal;
- using DotPulsar.Internal.Abstractions;
- using DotPulsar.Internal.Events;
- using NSubstitute;
- using System;
- using System.Collections.Concurrent;
- using System.Collections.Generic;
- using Xunit;
-
- public class PartitionedProducerProcessTests
- {
- [Fact]
- public void TestPartitionedProducerStateManage_WhenSubProducersStateChange_ThenPartitionedProducerStateChangeCorrectly()
- {
- var connectionPool = Substitute.For<IConnectionPool>();
- var establishNewChannel = Substitute.For<IEstablishNewChannel>();
- var producer = Substitute.For<IProducer>();
-
- var processManager = new ProcessManager(connectionPool);
-
- var producerGuids = new Dictionary<uint, Guid>(3);
- var producersGroup = new ConcurrentDictionary<uint, IProducer>(Environment.ProcessorCount, 3);
- var partitionedProducerGuid = Guid.NewGuid();
-
- for (uint i = 0; i < 3; i++)
- {
- var stateManager = new StateManager<ProducerState>(ProducerState.Disconnected, ProducerState.Closed, ProducerState.Faulted);
- var correlationId = Guid.NewGuid();
- var process = new ProducerProcess(correlationId, stateManager, establishNewChannel, processManager, partitionedProducerGuid);
- producerGuids[i] = correlationId;
- producersGroup[i] = producer;
- processManager.Add(process);
- }
-
- var partitionedStateManager = new StateManager<ProducerState>(ProducerState.Disconnected, ProducerState.Closed, ProducerState.Faulted);
-
- var producerProcess = new ProducerProcess(partitionedProducerGuid, partitionedStateManager, establishNewChannel, new ProcessManager(connectionPool));
- processManager.Add(producerProcess);
- processManager.Register(new UpdatePartitions(partitionedProducerGuid, (uint) producersGroup.Count));
-
- // Test initial channel
- processManager.Register(new ChannelDisconnected(producerGuids[0]));
- Assert.Equal(ProducerState.Disconnected, partitionedStateManager.CurrentState);
- processManager.Register(new ChannelDisconnected(producerGuids[1]));
- Assert.Equal(ProducerState.Disconnected, partitionedStateManager.CurrentState);
- processManager.Register(new ChannelDisconnected(producerGuids[2]));
- Assert.Equal(ProducerState.Disconnected, partitionedStateManager.CurrentState);
-
- // Test connect
- Assert.Equal(ProducerState.Disconnected, partitionedStateManager.CurrentState);
- processManager.Register(new ChannelConnected(producerGuids[0]));
- Assert.Equal(ProducerState.PartiallyConnected, partitionedStateManager.CurrentState);
- processManager.Register(new ChannelConnected(producerGuids[1]));
- Assert.Equal(ProducerState.PartiallyConnected, partitionedStateManager.CurrentState);
- processManager.Register(new ChannelConnected(producerGuids[2]));
- Assert.Equal(ProducerState.Connected, partitionedStateManager.CurrentState);
-
- // Test disconnect
- processManager.Register(new ChannelDisconnected(producerGuids[1]));
- Assert.Equal(ProducerState.PartiallyConnected, partitionedStateManager.CurrentState);
-
- // Test reconnect
- processManager.Register(new ChannelConnected(producerGuids[1]));
- Assert.Equal(ProducerState.Connected, partitionedStateManager.CurrentState);
-
- // Test fault
- processManager.Register(new ExecutorFaulted(producerGuids[1]));
- Assert.Equal(ProducerState.Faulted, partitionedStateManager.CurrentState);
- }
-
- [Fact]
- public void TestUpdatePartitions_WhenIncreasePartitions_ThenPartitionedProducerStateChangeCorrectly()
- {
- var connectionPool = Substitute.For<IConnectionPool>();
- var processManager = new ProcessManager(connectionPool);
- var establishNewChannel = Substitute.For<IEstablishNewChannel>();
-
- var guid = Guid.NewGuid();
- var stateManager = new StateManager<ProducerState>(ProducerState.Disconnected, ProducerState.Closed, ProducerState.Faulted);
- var process = new ProducerProcess(guid, stateManager, establishNewChannel, new ProcessManager(connectionPool));
- processManager.Add(process);
- processManager.Register(new UpdatePartitions(guid, 1));
-
- Assert.Equal(ProducerState.Disconnected, stateManager.CurrentState);
- processManager.Register(new PartitionedSubProducerStateChanged(guid, ProducerState.Connected));
- Assert.Equal(ProducerState.Connected, stateManager.CurrentState);
- processManager.Register(new UpdatePartitions(guid, 2));
- Assert.Equal(ProducerState.PartiallyConnected, stateManager.CurrentState);
- processManager.Register(new PartitionedSubProducerStateChanged(guid, ProducerState.Connected));
- Assert.Equal(ProducerState.Connected, stateManager.CurrentState);
- }
- }
-}