You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bl...@apache.org on 2022/10/24 14:13:52 UTC

[pulsar-dotpulsar] branch master updated: Adding support for concurrent processing of messages via the Process extension method for IConsumer. This needs testing before release.

This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 11d5a41  Adding support for concurrent processing of messages via the Process extension method for IConsumer. This needs testing before release.
     new ce7538f  Merge branch 'master' of https://github.com/apache/pulsar-dotpulsar
11d5a41 is described below

commit 11d5a4141ff8ae8ad8910d58df661f4c919db7d7
Author: Daniel Blankensteiner <db...@vmail.dk>
AuthorDate: Mon Oct 24 16:13:26 2022 +0200

    Adding support for concurrent processing of messages via the Process extension method for IConsumer. This needs testing before release.
---
 CHANGELOG.md                                   |   6 +
 samples/Processing/Worker.cs                   |   6 +-
 src/DotPulsar/DotPulsar.csproj                 |   2 +-
 src/DotPulsar/Extensions/ConsumerExtensions.cs |  65 ++------
 src/DotPulsar/Internal/MessageProcessor.cs     | 211 +++++++++++++++++++++++++
 src/DotPulsar/ProcessingOptions.cs             |  99 ++++++++++++
 tests/DotPulsar.Tests/DotPulsar.Tests.csproj   |   2 +-
 7 files changed, 335 insertions(+), 56 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index b6b65c5..f808077 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file.
 
 The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
 
+## [Unreleased]
+
+### Added
+
+- The 'Process' extension method for IConsumer\<TMessage\> can now be used for parallel processing of messages
+
 ## [2.4.1] - 2022-09-16
 
 ### Changed
diff --git a/samples/Processing/Worker.cs b/samples/Processing/Worker.cs
index caa9d54..25ad087 100644
--- a/samples/Processing/Worker.cs
+++ b/samples/Processing/Worker.cs
@@ -36,7 +36,11 @@ public class Worker : BackgroundService
             .Topic("persistent://public/default/mytopic")
             .Create();
 
-        await consumer.Process(ProcessMessage, cancellationToken);
+        var options = new ProcessingOptions
+        {
+            MaxDegreeOfParallelism = 5
+        };
+        await consumer.Process(ProcessMessage, options, cancellationToken);
     }
 
     private ValueTask ProcessMessage(IMessage<string> message, CancellationToken cancellationToken)
diff --git a/src/DotPulsar/DotPulsar.csproj b/src/DotPulsar/DotPulsar.csproj
index c564268..c3ae7b6 100644
--- a/src/DotPulsar/DotPulsar.csproj
+++ b/src/DotPulsar/DotPulsar.csproj
@@ -23,7 +23,7 @@
 
   <ItemGroup>    
     <PackageReference Include="HashDepot" Version="2.0.3" />
-    <PackageReference Include="Microsoft.Extensions.ObjectPool" Version="6.0.9" />
+    <PackageReference Include="Microsoft.Extensions.ObjectPool" Version="6.0.10" />
     <PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.1.1" PrivateAssets="All" />
     <PackageReference Include="protobuf-net" Version="3.1.22" />
     <PackageReference Include="System.IO.Pipelines" Version="6.0.3" />
diff --git a/src/DotPulsar/Extensions/ConsumerExtensions.cs b/src/DotPulsar/Extensions/ConsumerExtensions.cs
index ca548b8..a2581e7 100644
--- a/src/DotPulsar/Extensions/ConsumerExtensions.cs
+++ b/src/DotPulsar/Extensions/ConsumerExtensions.cs
@@ -16,10 +16,7 @@ namespace DotPulsar.Extensions;
 
 using DotPulsar.Abstractions;
 using DotPulsar.Internal;
-using DotPulsar.Internal.Extensions;
 using System;
-using System.Collections.Generic;
-using System.Diagnostics;
 using System.Threading;
 using System.Threading.Tasks;
 
@@ -46,60 +43,22 @@ public static class ConsumerExtensions
     public static async ValueTask Process<TMessage>(
         this IConsumer<TMessage> consumer,
         Func<IMessage<TMessage>, CancellationToken, ValueTask> processor,
+        ProcessingOptions options,
         CancellationToken cancellationToken = default)
     {
-        const string operation = "process";
-        var operationName = $"{consumer.Topic} {operation}";
-
-        var activityTags = new KeyValuePair<string, object?>[]
-        {
-            new KeyValuePair<string, object?>("messaging.destination", consumer.Topic),
-            new KeyValuePair<string, object?>("messaging.destination_kind", "topic"),
-            new KeyValuePair<string, object?>("messaging.operation", operation),
-            new KeyValuePair<string, object?>("messaging.system", "pulsar"),
-            new KeyValuePair<string, object?>("messaging.url", consumer.ServiceUrl),
-            new KeyValuePair<string, object?>("messaging.pulsar.subscription", consumer.SubscriptionName)
-        };
-
-        var meterTags = new KeyValuePair<string, object?>[]
-        {
-            new KeyValuePair<string, object?>("topic", consumer.Topic),
-            new KeyValuePair<string, object?>("subscription", consumer.SubscriptionName)
-        };
-
-        while (!cancellationToken.IsCancellationRequested)
-        {
-            var message = await consumer.Receive(cancellationToken).ConfigureAwait(false);
-
-            var activity = DotPulsarActivitySource.StartConsumerActivity(message, operationName, activityTags);
-            if (activity is not null && activity.IsAllDataRequested)
-            {
-                activity.SetMessageId(message.MessageId);
-                activity.SetPayloadSize(message.Data.Length);
-                activity.SetStatus(ActivityStatusCode.Ok);
-            }
-
-            var startTimestamp = DotPulsarMeter.MessageProcessedEnabled ? Stopwatch.GetTimestamp() : 0;
-
-            try
-            {
-                await processor(message, cancellationToken).ConfigureAwait(false);
-            }
-            catch (Exception exception)
-            {
-                if (activity is not null && activity.IsAllDataRequested)
-                    activity.AddException(exception);
-            }
-
-            if (startTimestamp != 0)
-                DotPulsarMeter.MessageProcessed(startTimestamp, meterTags);
-
-            activity?.Dispose();
-
-            await consumer.Acknowledge(message.MessageId, cancellationToken).ConfigureAwait(false);
-        }
+        using var messageProcessor = new MessageProcessor<TMessage>(consumer, processor, options);
+        await messageProcessor.Process(cancellationToken).ConfigureAwait(false);
     }
 
+    /// <summary>
+    /// Process and auto-acknowledge a message using the default processing options.
+    /// </summary>
+    public static async ValueTask Process<TMessage>(
+        this IConsumer<TMessage> consumer,
+        Func<IMessage<TMessage>, CancellationToken, ValueTask> processor,
+        CancellationToken cancellationToken = default)
+        => await Process(consumer, processor, new ProcessingOptions(), cancellationToken).ConfigureAwait(false);
+
     /// <summary>
     /// Wait for the state to change to a specific state.
     /// </summary>
diff --git a/src/DotPulsar/Internal/MessageProcessor.cs b/src/DotPulsar/Internal/MessageProcessor.cs
new file mode 100644
index 0000000..c3355f4
--- /dev/null
+++ b/src/DotPulsar/Internal/MessageProcessor.cs
@@ -0,0 +1,211 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal;
+
+using DotPulsar.Abstractions;
+using DotPulsar.Internal.Extensions;
+using Microsoft.Extensions.ObjectPool;
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Threading;
+using System.Threading.Tasks;
+
+public sealed class MessageProcessor<TMessage> : IDisposable
+{
+    private readonly string _operationName;
+    private readonly KeyValuePair<string, object?>[] _activityTags;
+    private readonly KeyValuePair<string, object?>[] _meterTags;
+    private readonly IConsumer<TMessage> _consumer;
+    private readonly Func<IMessage<TMessage>, CancellationToken, ValueTask> _processor;
+    private readonly LinkedList<Task> _processorTasks;
+    private readonly ConcurrentQueue<ProcessInfo> _processingQueue;
+    private readonly SemaphoreSlim _receiveLock;
+    private readonly SemaphoreSlim _acknowledgeLock;
+    private readonly ObjectPool<ProcessInfo> _processInfoPool;
+    private readonly bool _ensureOrderedAcknowledgement;
+    private readonly int _maxDegreeOfParallelism;
+    private readonly int _maxMessagesPerTask;
+    private readonly TaskScheduler _taskScheduler;
+
+    public MessageProcessor(IConsumer<TMessage> consumer, Func<IMessage<TMessage>, CancellationToken, ValueTask> processor, ProcessingOptions options)
+    {
+        const string operation = "process";
+        _operationName = $"{consumer.Topic} {operation}";
+
+        _activityTags = new KeyValuePair<string, object?>[]
+        {
+            new KeyValuePair<string, object?>("messaging.destination", consumer.Topic),
+            new KeyValuePair<string, object?>("messaging.destination_kind", "topic"),
+            new KeyValuePair<string, object?>("messaging.operation", operation),
+            new KeyValuePair<string, object?>("messaging.system", "pulsar"),
+            new KeyValuePair<string, object?>("messaging.url", consumer.ServiceUrl),
+            new KeyValuePair<string, object?>("messaging.pulsar.subscription", consumer.SubscriptionName)
+        };
+
+        _meterTags = new KeyValuePair<string, object?>[]
+        {
+            new KeyValuePair<string, object?>("topic", consumer.Topic),
+            new KeyValuePair<string, object?>("subscription", consumer.SubscriptionName)
+        };
+
+        _consumer = consumer;
+        _processor = processor;
+        _processorTasks = new LinkedList<Task>();
+        _processingQueue = new ConcurrentQueue<ProcessInfo>();
+        _receiveLock = new SemaphoreSlim(1, 1);
+        _acknowledgeLock = new SemaphoreSlim(1, 1);
+        _processInfoPool = new DefaultObjectPool<ProcessInfo>(new DefaultPooledObjectPolicy<ProcessInfo>());
+
+        _ensureOrderedAcknowledgement = options.EnsureOrderedAcknowledgement;
+        _maxDegreeOfParallelism = options.MaxDegreeOfParallelism;
+        _maxMessagesPerTask = options.MaxMessagesPerTask;
+        _taskScheduler = options.TaskScheduler;
+    }
+
+    public void Dispose()
+    {
+        _receiveLock.Dispose();
+        _acknowledgeLock.Dispose();
+    }
+
+    public async ValueTask Process(CancellationToken cancellationToken)
+    {
+        for (var i = 1; i < _maxDegreeOfParallelism; ++i)
+        {
+            StartNewProcessorTask(cancellationToken);
+        }
+
+        while (!cancellationToken.IsCancellationRequested && !_consumer.IsFinalState())
+        {
+            StartNewProcessorTask(cancellationToken);
+            var completedTask = await Task.WhenAny(_processorTasks).ConfigureAwait(false);
+            _processorTasks.Remove(completedTask);
+        }
+    }
+
+    private async ValueTask Processor(CancellationToken cancellationToken)
+    {
+        var messagesProcessed = 0;
+
+        var processInfo = new ProcessInfo();
+
+        var needToEnsureOrderedAcknowledgement = _ensureOrderedAcknowledgement && _maxDegreeOfParallelism > 1;
+        var isUnbounded = _maxMessagesPerTask == ProcessingOptions.Unbounded;
+
+        while (!cancellationToken.IsCancellationRequested)
+        {
+            if (needToEnsureOrderedAcknowledgement)
+            {
+                processInfo = _processInfoPool.Get();
+                await _receiveLock.WaitAsync(cancellationToken).ConfigureAwait(false);
+            }
+
+            var message = await _consumer.Receive(cancellationToken).ConfigureAwait(false);
+
+            if (needToEnsureOrderedAcknowledgement)
+            {
+                processInfo.MessageId = message.MessageId;
+                processInfo.IsProcessed = false;
+                _processingQueue.Enqueue(processInfo);
+                _receiveLock.Release();
+            }
+
+            var activity = DotPulsarActivitySource.StartConsumerActivity(message, _operationName, _activityTags);
+            if (activity is not null && activity.IsAllDataRequested)
+            {
+                activity.SetMessageId(message.MessageId);
+                activity.SetPayloadSize(message.Data.Length);
+                activity.SetStatus(ActivityStatusCode.Ok);
+            }
+
+            var startTimestamp = DotPulsarMeter.MessageProcessedEnabled ? Stopwatch.GetTimestamp() : 0;
+
+            try
+            {
+                await _processor(message, cancellationToken).ConfigureAwait(false);
+            }
+            catch (Exception exception)
+            {
+                if (activity is not null && activity.IsAllDataRequested)
+                    activity.AddException(exception);
+            }
+
+            if (startTimestamp != 0)
+                DotPulsarMeter.MessageProcessed(startTimestamp, _meterTags);
+
+            activity?.Dispose();
+
+            if (needToEnsureOrderedAcknowledgement)
+            {
+                await _acknowledgeLock.WaitAsync(cancellationToken).ConfigureAwait(false);
+
+                processInfo.IsProcessed = true;
+                var messagesToAcknowledge = 0;
+                MessageId? messageId = null;
+
+                while (_processingQueue.TryPeek(out processInfo))
+                {
+                    if (!processInfo.IsProcessed)
+                        break;
+
+                    ++messagesToAcknowledge;
+
+                    if (_processingQueue.TryDequeue(out processInfo))
+                    {
+                        messageId = processInfo.MessageId;
+                        _processInfoPool.Return(processInfo);
+                    }
+                }
+
+                if (messagesToAcknowledge == 1)
+                    await _consumer.Acknowledge(messageId!).ConfigureAwait(false);
+                else if (messagesToAcknowledge > 1)
+                    await _consumer.AcknowledgeCumulative(messageId!).ConfigureAwait(false);
+
+                _acknowledgeLock.Release();
+            }
+            else
+                await _consumer.Acknowledge(message.MessageId).ConfigureAwait(false);
+
+            if (!isUnbounded && ++messagesProcessed == _maxMessagesPerTask)
+                return;
+        }
+    }
+
+    private void StartNewProcessorTask(CancellationToken cancellationToken)
+    {
+        var processorTask = Task.Factory.StartNew(
+            async () => await Processor(cancellationToken).ConfigureAwait(false),
+            cancellationToken,
+            TaskCreationOptions.None,
+            _taskScheduler).Unwrap();
+
+        _processorTasks.AddLast(processorTask);
+    }
+
+    private sealed class ProcessInfo
+    {
+        public ProcessInfo()
+        {
+            MessageId = MessageId.Earliest;
+            IsProcessed = false;
+        }
+
+        public MessageId MessageId { get; set; }
+        public bool IsProcessed { get; set; }
+    }
+}
diff --git a/src/DotPulsar/ProcessingOptions.cs b/src/DotPulsar/ProcessingOptions.cs
new file mode 100644
index 0000000..d01a77c
--- /dev/null
+++ b/src/DotPulsar/ProcessingOptions.cs
@@ -0,0 +1,99 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar;
+
+using System;
+using System.Threading.Tasks;
+
+/// <summary>
+/// The processing options.
+/// </summary>
+public sealed class ProcessingOptions
+{
+    /// <summary>
+    /// The value used to represent unlimited processing of messages per task.
+    /// </summary>
+    public const int Unbounded = -1;
+
+    private bool _ensureOrderedAcknowledgement;
+    private int _maxDegreeOfParallelism;
+    private int _maxMessagesPerTask;
+    private TaskScheduler _taskScheduler;
+
+    /// <summary>
+    /// Initializes a new instance with the default values.
+    /// </summary>
+    public ProcessingOptions()
+    {
+        _ensureOrderedAcknowledgement = true;
+        _maxDegreeOfParallelism = 1;
+        _maxMessagesPerTask = Unbounded;
+        _taskScheduler = TaskScheduler.Default;
+    }
+
+    /// <summary>
+    /// Whether ordered acknowledgement should be enforced. The default is 'true'.
+    /// </summary>
+    public bool EnsureOrderedAcknowledgement
+    {
+        get => _ensureOrderedAcknowledgement;
+        set { _ensureOrderedAcknowledgement = value; }
+    }
+
+    /// <summary>
+    /// The maximum number of messages that may be processed concurrently. The default is 1.
+    /// </summary>
+    public int MaxDegreeOfParallelism
+    {
+        get => _maxDegreeOfParallelism;
+        set
+        {
+            if (value < 1)
+                throw new ArgumentOutOfRangeException(nameof(value));
+
+            _maxDegreeOfParallelism = value;
+        }
+    }
+
+    /// <summary>
+    /// The maximum number of messages that may be processed per task. The default is -1 (unlimited).
+    /// </summary>
+    public int MaxMessagesPerTask
+    {
+        get => _maxMessagesPerTask;
+        set
+        {
+            if (value < 1 && value != Unbounded)
+                throw new ArgumentOutOfRangeException(nameof(value));
+
+            _maxMessagesPerTask = value;
+        }
+    }
+
+    /// <summary>
+    /// The TaskScheduler to use for scheduling tasks. The default is TaskScheduler.Default.
+    /// </summary>
+    public TaskScheduler TaskScheduler
+    {
+        get => _taskScheduler;
+        set
+        {
+            if (value is null)
+                throw new ArgumentNullException(nameof(value));
+
+            _taskScheduler = value;
+        }
+    }
+}
diff --git a/tests/DotPulsar.Tests/DotPulsar.Tests.csproj b/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
index 0e5832b..e9ae20c 100644
--- a/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
+++ b/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
@@ -8,7 +8,7 @@
   <ItemGroup>
     <PackageReference Include="DotNetZip" Version="1.16.0" />
     <PackageReference Include="Ductus.FluentDocker" Version="2.10.57" />
-    <PackageReference Include="FluentAssertions" Version="6.7.0" />
+    <PackageReference Include="FluentAssertions" Version="6.8.0" />
     <PackageReference Include="IronSnappy" Version="1.3.0" />
     <PackageReference Include="K4os.Compression.LZ4" Version="1.2.16" />
     <PackageReference Include="NSubstitute" Version="4.4.0" />