You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bl...@apache.org on 2022/10/24 14:13:52 UTC
[pulsar-dotpulsar] branch master updated: Adding support for concurrent processing of messages via the Process extension method for IConsumer. This needs testing before release.
This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 11d5a41 Adding support for concurrent processing of messages via the Process extension method for IConsumer. This needs testing before release.
new ce7538f Merge branch 'master' of https://github.com/apache/pulsar-dotpulsar
11d5a41 is described below
commit 11d5a4141ff8ae8ad8910d58df661f4c919db7d7
Author: Daniel Blankensteiner <db...@vmail.dk>
AuthorDate: Mon Oct 24 16:13:26 2022 +0200
Adding support for concurrent processing of messages via the Process extension method for IConsumer. This needs testing before release.
---
CHANGELOG.md | 6 +
samples/Processing/Worker.cs | 6 +-
src/DotPulsar/DotPulsar.csproj | 2 +-
src/DotPulsar/Extensions/ConsumerExtensions.cs | 65 ++------
src/DotPulsar/Internal/MessageProcessor.cs | 211 +++++++++++++++++++++++++
src/DotPulsar/ProcessingOptions.cs | 99 ++++++++++++
tests/DotPulsar.Tests/DotPulsar.Tests.csproj | 2 +-
7 files changed, 335 insertions(+), 56 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index b6b65c5..f808077 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
+## [Unreleased]
+
+### Added
+
+- The 'Process' extension method for IConsumer\<TMessage\> can now be used for parallel processing of messages
+
## [2.4.1] - 2022-09-16
### Changed
diff --git a/samples/Processing/Worker.cs b/samples/Processing/Worker.cs
index caa9d54..25ad087 100644
--- a/samples/Processing/Worker.cs
+++ b/samples/Processing/Worker.cs
@@ -36,7 +36,11 @@ public class Worker : BackgroundService
.Topic("persistent://public/default/mytopic")
.Create();
- await consumer.Process(ProcessMessage, cancellationToken);
+ var options = new ProcessingOptions
+ {
+ MaxDegreeOfParallelism = 5
+ };
+ await consumer.Process(ProcessMessage, options, cancellationToken);
}
private ValueTask ProcessMessage(IMessage<string> message, CancellationToken cancellationToken)
diff --git a/src/DotPulsar/DotPulsar.csproj b/src/DotPulsar/DotPulsar.csproj
index c564268..c3ae7b6 100644
--- a/src/DotPulsar/DotPulsar.csproj
+++ b/src/DotPulsar/DotPulsar.csproj
@@ -23,7 +23,7 @@
<ItemGroup>
<PackageReference Include="HashDepot" Version="2.0.3" />
- <PackageReference Include="Microsoft.Extensions.ObjectPool" Version="6.0.9" />
+ <PackageReference Include="Microsoft.Extensions.ObjectPool" Version="6.0.10" />
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.1.1" PrivateAssets="All" />
<PackageReference Include="protobuf-net" Version="3.1.22" />
<PackageReference Include="System.IO.Pipelines" Version="6.0.3" />
diff --git a/src/DotPulsar/Extensions/ConsumerExtensions.cs b/src/DotPulsar/Extensions/ConsumerExtensions.cs
index ca548b8..a2581e7 100644
--- a/src/DotPulsar/Extensions/ConsumerExtensions.cs
+++ b/src/DotPulsar/Extensions/ConsumerExtensions.cs
@@ -16,10 +16,7 @@ namespace DotPulsar.Extensions;
using DotPulsar.Abstractions;
using DotPulsar.Internal;
-using DotPulsar.Internal.Extensions;
using System;
-using System.Collections.Generic;
-using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
@@ -46,60 +43,22 @@ public static class ConsumerExtensions
public static async ValueTask Process<TMessage>(
this IConsumer<TMessage> consumer,
Func<IMessage<TMessage>, CancellationToken, ValueTask> processor,
+ ProcessingOptions options,
CancellationToken cancellationToken = default)
{
- const string operation = "process";
- var operationName = $"{consumer.Topic} {operation}";
-
- var activityTags = new KeyValuePair<string, object?>[]
- {
- new KeyValuePair<string, object?>("messaging.destination", consumer.Topic),
- new KeyValuePair<string, object?>("messaging.destination_kind", "topic"),
- new KeyValuePair<string, object?>("messaging.operation", operation),
- new KeyValuePair<string, object?>("messaging.system", "pulsar"),
- new KeyValuePair<string, object?>("messaging.url", consumer.ServiceUrl),
- new KeyValuePair<string, object?>("messaging.pulsar.subscription", consumer.SubscriptionName)
- };
-
- var meterTags = new KeyValuePair<string, object?>[]
- {
- new KeyValuePair<string, object?>("topic", consumer.Topic),
- new KeyValuePair<string, object?>("subscription", consumer.SubscriptionName)
- };
-
- while (!cancellationToken.IsCancellationRequested)
- {
- var message = await consumer.Receive(cancellationToken).ConfigureAwait(false);
-
- var activity = DotPulsarActivitySource.StartConsumerActivity(message, operationName, activityTags);
- if (activity is not null && activity.IsAllDataRequested)
- {
- activity.SetMessageId(message.MessageId);
- activity.SetPayloadSize(message.Data.Length);
- activity.SetStatus(ActivityStatusCode.Ok);
- }
-
- var startTimestamp = DotPulsarMeter.MessageProcessedEnabled ? Stopwatch.GetTimestamp() : 0;
-
- try
- {
- await processor(message, cancellationToken).ConfigureAwait(false);
- }
- catch (Exception exception)
- {
- if (activity is not null && activity.IsAllDataRequested)
- activity.AddException(exception);
- }
-
- if (startTimestamp != 0)
- DotPulsarMeter.MessageProcessed(startTimestamp, meterTags);
-
- activity?.Dispose();
-
- await consumer.Acknowledge(message.MessageId, cancellationToken).ConfigureAwait(false);
- }
+ using var messageProcessor = new MessageProcessor<TMessage>(consumer, processor, options);
+ await messageProcessor.Process(cancellationToken).ConfigureAwait(false);
}
+ /// <summary>
+ /// Process and auto-acknowledge a message using the default processing options.
+ /// </summary>
+ public static async ValueTask Process<TMessage>(
+ this IConsumer<TMessage> consumer,
+ Func<IMessage<TMessage>, CancellationToken, ValueTask> processor,
+ CancellationToken cancellationToken = default)
+ => await Process(consumer, processor, new ProcessingOptions(), cancellationToken).ConfigureAwait(false);
+
/// <summary>
/// Wait for the state to change to a specific state.
/// </summary>
diff --git a/src/DotPulsar/Internal/MessageProcessor.cs b/src/DotPulsar/Internal/MessageProcessor.cs
new file mode 100644
index 0000000..c3355f4
--- /dev/null
+++ b/src/DotPulsar/Internal/MessageProcessor.cs
@@ -0,0 +1,211 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal;
+
+using DotPulsar.Abstractions;
+using DotPulsar.Internal.Extensions;
+using Microsoft.Extensions.ObjectPool;
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Threading;
+using System.Threading.Tasks;
+
+public sealed class MessageProcessor<TMessage> : IDisposable
+{
+ private readonly string _operationName;
+ private readonly KeyValuePair<string, object?>[] _activityTags;
+ private readonly KeyValuePair<string, object?>[] _meterTags;
+ private readonly IConsumer<TMessage> _consumer;
+ private readonly Func<IMessage<TMessage>, CancellationToken, ValueTask> _processor;
+ private readonly LinkedList<Task> _processorTasks;
+ private readonly ConcurrentQueue<ProcessInfo> _processingQueue;
+ private readonly SemaphoreSlim _receiveLock;
+ private readonly SemaphoreSlim _acknowledgeLock;
+ private readonly ObjectPool<ProcessInfo> _processInfoPool;
+ private readonly bool _ensureOrderedAcknowledgement;
+ private readonly int _maxDegreeOfParallelism;
+ private readonly int _maxMessagesPerTask;
+ private readonly TaskScheduler _taskScheduler;
+
+ public MessageProcessor(IConsumer<TMessage> consumer, Func<IMessage<TMessage>, CancellationToken, ValueTask> processor, ProcessingOptions options)
+ {
+ const string operation = "process";
+ _operationName = $"{consumer.Topic} {operation}";
+
+ _activityTags = new KeyValuePair<string, object?>[]
+ {
+ new KeyValuePair<string, object?>("messaging.destination", consumer.Topic),
+ new KeyValuePair<string, object?>("messaging.destination_kind", "topic"),
+ new KeyValuePair<string, object?>("messaging.operation", operation),
+ new KeyValuePair<string, object?>("messaging.system", "pulsar"),
+ new KeyValuePair<string, object?>("messaging.url", consumer.ServiceUrl),
+ new KeyValuePair<string, object?>("messaging.pulsar.subscription", consumer.SubscriptionName)
+ };
+
+ _meterTags = new KeyValuePair<string, object?>[]
+ {
+ new KeyValuePair<string, object?>("topic", consumer.Topic),
+ new KeyValuePair<string, object?>("subscription", consumer.SubscriptionName)
+ };
+
+ _consumer = consumer;
+ _processor = processor;
+ _processorTasks = new LinkedList<Task>();
+ _processingQueue = new ConcurrentQueue<ProcessInfo>();
+ _receiveLock = new SemaphoreSlim(1, 1);
+ _acknowledgeLock = new SemaphoreSlim(1, 1);
+ _processInfoPool = new DefaultObjectPool<ProcessInfo>(new DefaultPooledObjectPolicy<ProcessInfo>());
+
+ _ensureOrderedAcknowledgement = options.EnsureOrderedAcknowledgement;
+ _maxDegreeOfParallelism = options.MaxDegreeOfParallelism;
+ _maxMessagesPerTask = options.MaxMessagesPerTask;
+ _taskScheduler = options.TaskScheduler;
+ }
+
+ public void Dispose()
+ {
+ _receiveLock.Dispose();
+ _acknowledgeLock.Dispose();
+ }
+
+ public async ValueTask Process(CancellationToken cancellationToken)
+ {
+ for (var i = 1; i < _maxDegreeOfParallelism; ++i)
+ {
+ StartNewProcessorTask(cancellationToken);
+ }
+
+ while (!cancellationToken.IsCancellationRequested && !_consumer.IsFinalState())
+ {
+ StartNewProcessorTask(cancellationToken);
+ var completedTask = await Task.WhenAny(_processorTasks).ConfigureAwait(false);
+ _processorTasks.Remove(completedTask);
+ }
+ }
+
+ private async ValueTask Processor(CancellationToken cancellationToken)
+ {
+ var messagesProcessed = 0;
+
+ var processInfo = new ProcessInfo();
+
+ var needToEnsureOrderedAcknowledgement = _ensureOrderedAcknowledgement && _maxDegreeOfParallelism > 1;
+ var isUnbounded = _maxMessagesPerTask == ProcessingOptions.Unbounded;
+
+ while (!cancellationToken.IsCancellationRequested)
+ {
+ if (needToEnsureOrderedAcknowledgement)
+ {
+ processInfo = _processInfoPool.Get();
+ await _receiveLock.WaitAsync(cancellationToken).ConfigureAwait(false);
+ }
+
+ var message = await _consumer.Receive(cancellationToken).ConfigureAwait(false);
+
+ if (needToEnsureOrderedAcknowledgement)
+ {
+ processInfo.MessageId = message.MessageId;
+ processInfo.IsProcessed = false;
+ _processingQueue.Enqueue(processInfo);
+ _receiveLock.Release();
+ }
+
+ var activity = DotPulsarActivitySource.StartConsumerActivity(message, _operationName, _activityTags);
+ if (activity is not null && activity.IsAllDataRequested)
+ {
+ activity.SetMessageId(message.MessageId);
+ activity.SetPayloadSize(message.Data.Length);
+ activity.SetStatus(ActivityStatusCode.Ok);
+ }
+
+ var startTimestamp = DotPulsarMeter.MessageProcessedEnabled ? Stopwatch.GetTimestamp() : 0;
+
+ try
+ {
+ await _processor(message, cancellationToken).ConfigureAwait(false);
+ }
+ catch (Exception exception)
+ {
+ if (activity is not null && activity.IsAllDataRequested)
+ activity.AddException(exception);
+ }
+
+ if (startTimestamp != 0)
+ DotPulsarMeter.MessageProcessed(startTimestamp, _meterTags);
+
+ activity?.Dispose();
+
+ if (needToEnsureOrderedAcknowledgement)
+ {
+ await _acknowledgeLock.WaitAsync(cancellationToken).ConfigureAwait(false);
+
+ processInfo.IsProcessed = true;
+ var messagesToAcknowledge = 0;
+ MessageId? messageId = null;
+
+ while (_processingQueue.TryPeek(out processInfo))
+ {
+ if (!processInfo.IsProcessed)
+ break;
+
+ ++messagesToAcknowledge;
+
+ if (_processingQueue.TryDequeue(out processInfo))
+ {
+ messageId = processInfo.MessageId;
+ _processInfoPool.Return(processInfo);
+ }
+ }
+
+ if (messagesToAcknowledge == 1)
+ await _consumer.Acknowledge(messageId!).ConfigureAwait(false);
+ else if (messagesToAcknowledge > 1)
+ await _consumer.AcknowledgeCumulative(messageId!).ConfigureAwait(false);
+
+ _acknowledgeLock.Release();
+ }
+ else
+ await _consumer.Acknowledge(message.MessageId).ConfigureAwait(false);
+
+ if (!isUnbounded && ++messagesProcessed == _maxMessagesPerTask)
+ return;
+ }
+ }
+
+ private void StartNewProcessorTask(CancellationToken cancellationToken)
+ {
+ var processorTask = Task.Factory.StartNew(
+ async () => await Processor(cancellationToken).ConfigureAwait(false),
+ cancellationToken,
+ TaskCreationOptions.None,
+ _taskScheduler).Unwrap();
+
+ _processorTasks.AddLast(processorTask);
+ }
+
+ private sealed class ProcessInfo
+ {
+ public ProcessInfo()
+ {
+ MessageId = MessageId.Earliest;
+ IsProcessed = false;
+ }
+
+ public MessageId MessageId { get; set; }
+ public bool IsProcessed { get; set; }
+ }
+}
diff --git a/src/DotPulsar/ProcessingOptions.cs b/src/DotPulsar/ProcessingOptions.cs
new file mode 100644
index 0000000..d01a77c
--- /dev/null
+++ b/src/DotPulsar/ProcessingOptions.cs
@@ -0,0 +1,99 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar;
+
+using System;
+using System.Threading.Tasks;
+
+/// <summary>
+/// The processing options.
+/// </summary>
+public sealed class ProcessingOptions
+{
+ /// <summary>
+ /// The value used to represent unlimited processing of messages per task.
+ /// </summary>
+ public const int Unbounded = -1;
+
+ private bool _ensureOrderedAcknowledgement;
+ private int _maxDegreeOfParallelism;
+ private int _maxMessagesPerTask;
+ private TaskScheduler _taskScheduler;
+
+ /// <summary>
+ /// Initializes a new instance with the default values.
+ /// </summary>
+ public ProcessingOptions()
+ {
+ _ensureOrderedAcknowledgement = true;
+ _maxDegreeOfParallelism = 1;
+ _maxMessagesPerTask = Unbounded;
+ _taskScheduler = TaskScheduler.Default;
+ }
+
+ /// <summary>
+ /// Whether ordered acknowledgement should be enforced. The default is 'true'.
+ /// </summary>
+ public bool EnsureOrderedAcknowledgement
+ {
+ get => _ensureOrderedAcknowledgement;
+ set { _ensureOrderedAcknowledgement = value; }
+ }
+
+ /// <summary>
+ /// The maximum number of messages that may be processed concurrently. The default is 1.
+ /// </summary>
+ public int MaxDegreeOfParallelism
+ {
+ get => _maxDegreeOfParallelism;
+ set
+ {
+ if (value < 1)
+ throw new ArgumentOutOfRangeException(nameof(value));
+
+ _maxDegreeOfParallelism = value;
+ }
+ }
+
+ /// <summary>
+ /// The maximum number of messages that may be processed per task. The default is -1 (unlimited).
+ /// </summary>
+ public int MaxMessagesPerTask
+ {
+ get => _maxMessagesPerTask;
+ set
+ {
+ if (value < 1 && value != Unbounded)
+ throw new ArgumentOutOfRangeException(nameof(value));
+
+ _maxMessagesPerTask = value;
+ }
+ }
+
+ /// <summary>
+ /// The TaskScheduler to use for scheduling tasks. The default is TaskScheduler.Default.
+ /// </summary>
+ public TaskScheduler TaskScheduler
+ {
+ get => _taskScheduler;
+ set
+ {
+ if (value is null)
+ throw new ArgumentNullException(nameof(value));
+
+ _taskScheduler = value;
+ }
+ }
+}
diff --git a/tests/DotPulsar.Tests/DotPulsar.Tests.csproj b/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
index 0e5832b..e9ae20c 100644
--- a/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
+++ b/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
@@ -8,7 +8,7 @@
<ItemGroup>
<PackageReference Include="DotNetZip" Version="1.16.0" />
<PackageReference Include="Ductus.FluentDocker" Version="2.10.57" />
- <PackageReference Include="FluentAssertions" Version="6.7.0" />
+ <PackageReference Include="FluentAssertions" Version="6.8.0" />
<PackageReference Include="IronSnappy" Version="1.3.0" />
<PackageReference Include="K4os.Compression.LZ4" Version="1.2.16" />
<PackageReference Include="NSubstitute" Version="4.4.0" />