You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bl...@apache.org on 2022/10/31 13:00:50 UTC

[pulsar-dotpulsar] branch master updated: Updated NuGet package and make sure the 'Process' extension method for IConsumer will use the cancellation token when acknowledging messages

This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7452ded  Updated NuGet package and make sure the 'Process' extension method for IConsumer will use the cancellation token when acknowledging messages
7452ded is described below

commit 7452ded9b20144aefad123b0f594bdf718287747
Author: BLANKEN\blanken <db...@wisix.net>
AuthorDate: Mon Oct 31 14:00:41 2022 +0100

    Updated NuGet package and make sure the 'Process' extension method for IConsumer will use the cancellation token when acknowledging messages
---
 CHANGELOG.md                                 |  6 ++++
 src/DotPulsar/Internal/MessageProcessor.cs   | 51 +++++++++++++++-------------
 tests/DotPulsar.Tests/DotPulsar.Tests.csproj |  2 +-
 3 files changed, 34 insertions(+), 25 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 4512e50..e8ea186 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file.
 
 The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
 
+## [Unreleased]
+
+### Fixed
+
+- The 'Process' extension method for IConsumer\<TMessage\> will now use the cancellation token when acknowledging messages
+
 ## [2.5.0] - 2022-10-28
 
 ### Added
diff --git a/src/DotPulsar/Internal/MessageProcessor.cs b/src/DotPulsar/Internal/MessageProcessor.cs
index db6ece6..d2bed80 100644
--- a/src/DotPulsar/Internal/MessageProcessor.cs
+++ b/src/DotPulsar/Internal/MessageProcessor.cs
@@ -154,40 +154,43 @@ public sealed class MessageProcessor<TMessage> : IDisposable
             if (needToEnsureOrderedAcknowledgement)
             {
                 await _acknowledgeLock.WaitAsync(cancellationToken).ConfigureAwait(false);
-
                 processInfo.IsProcessed = true;
-                var messagesToAcknowledge = 0;
-                MessageId? messageId = null;
-
-                while (_processingQueue.TryPeek(out processInfo))
-                {
-                    if (!processInfo.IsProcessed)
-                        break;
-
-                    ++messagesToAcknowledge;
-
-                    if (_processingQueue.TryDequeue(out processInfo))
-                    {
-                        messageId = processInfo.MessageId;
-                        _processInfoPool.Return(processInfo);
-                    }
-                }
-
-                if (messagesToAcknowledge == 1)
-                    await _consumer.Acknowledge(messageId!).ConfigureAwait(false);
-                else if (messagesToAcknowledge > 1)
-                    await _consumer.AcknowledgeCumulative(messageId!).ConfigureAwait(false);
-
+                await AcknowledgeProcessedMessages(cancellationToken).ConfigureAwait(false);
                 _acknowledgeLock.Release();
             }
             else
-                await _consumer.Acknowledge(message.MessageId).ConfigureAwait(false);
+                await _consumer.Acknowledge(message.MessageId, cancellationToken).ConfigureAwait(false);
 
             if (!isUnbounded && ++messagesProcessed == _maxMessagesPerTask)
                 return;
         }
     }
 
+    private async ValueTask AcknowledgeProcessedMessages(CancellationToken cancellationToken)
+    {
+        var messagesToAcknowledge = 0;
+        var messageId = MessageId.Earliest;
+
+        while (_processingQueue.TryPeek(out var processInfo))
+        {
+            if (!processInfo.IsProcessed)
+                break;
+
+            ++messagesToAcknowledge;
+
+            if (_processingQueue.TryDequeue(out processInfo))
+            {
+                messageId = processInfo.MessageId;
+                _processInfoPool.Return(processInfo);
+            }
+        }
+
+        if (messagesToAcknowledge == 1)
+            await _consumer.Acknowledge(messageId, cancellationToken).ConfigureAwait(false);
+        else if (messagesToAcknowledge > 1)
+            await _consumer.AcknowledgeCumulative(messageId, cancellationToken).ConfigureAwait(false);
+    }
+
     private void StartNewProcessorTask(CancellationToken cancellationToken)
     {
         var processorTask = Task.Factory.StartNew(
diff --git a/tests/DotPulsar.Tests/DotPulsar.Tests.csproj b/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
index e9ae20c..69ed29a 100644
--- a/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
+++ b/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
@@ -18,7 +18,7 @@
       <PrivateAssets>all</PrivateAssets>
       <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
     </PackageReference>
-    <PackageReference Include="coverlet.collector" Version="3.1.2">
+    <PackageReference Include="coverlet.collector" Version="3.2.0">
       <PrivateAssets>all</PrivateAssets>
       <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
     </PackageReference>