You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bl...@apache.org on 2022/10/31 13:00:50 UTC
[pulsar-dotpulsar] branch master updated: Updated NuGet package and make sure the 'Process' extension method for IConsumer will use the cancellation token when acknowledging messages
This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7452ded Updated NuGet package and make sure the 'Process' extension method for IConsumer will use the cancellation token when acknowledging messages
7452ded is described below
commit 7452ded9b20144aefad123b0f594bdf718287747
Author: BLANKEN\blanken <db...@wisix.net>
AuthorDate: Mon Oct 31 14:00:41 2022 +0100
Updated NuGet package and make sure the 'Process' extension method for IConsumer will use the cancellation token when acknowledging messages
---
CHANGELOG.md | 6 ++++
src/DotPulsar/Internal/MessageProcessor.cs | 51 +++++++++++++++-------------
tests/DotPulsar.Tests/DotPulsar.Tests.csproj | 2 +-
3 files changed, 34 insertions(+), 25 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 4512e50..e8ea186 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
+## [Unreleased]
+
+### Fixed
+
+- The 'Process' extension method for IConsumer\<TMessage\> will now use the cancellation token when acknowledging messages
+
## [2.5.0] - 2022-10-28
### Added
diff --git a/src/DotPulsar/Internal/MessageProcessor.cs b/src/DotPulsar/Internal/MessageProcessor.cs
index db6ece6..d2bed80 100644
--- a/src/DotPulsar/Internal/MessageProcessor.cs
+++ b/src/DotPulsar/Internal/MessageProcessor.cs
@@ -154,40 +154,43 @@ public sealed class MessageProcessor<TMessage> : IDisposable
if (needToEnsureOrderedAcknowledgement)
{
await _acknowledgeLock.WaitAsync(cancellationToken).ConfigureAwait(false);
-
processInfo.IsProcessed = true;
- var messagesToAcknowledge = 0;
- MessageId? messageId = null;
-
- while (_processingQueue.TryPeek(out processInfo))
- {
- if (!processInfo.IsProcessed)
- break;
-
- ++messagesToAcknowledge;
-
- if (_processingQueue.TryDequeue(out processInfo))
- {
- messageId = processInfo.MessageId;
- _processInfoPool.Return(processInfo);
- }
- }
-
- if (messagesToAcknowledge == 1)
- await _consumer.Acknowledge(messageId!).ConfigureAwait(false);
- else if (messagesToAcknowledge > 1)
- await _consumer.AcknowledgeCumulative(messageId!).ConfigureAwait(false);
-
+ await AcknowledgeProcessedMessages(cancellationToken).ConfigureAwait(false);
_acknowledgeLock.Release();
}
else
- await _consumer.Acknowledge(message.MessageId).ConfigureAwait(false);
+ await _consumer.Acknowledge(message.MessageId, cancellationToken).ConfigureAwait(false);
if (!isUnbounded && ++messagesProcessed == _maxMessagesPerTask)
return;
}
}
+ private async ValueTask AcknowledgeProcessedMessages(CancellationToken cancellationToken)
+ {
+ var messagesToAcknowledge = 0;
+ var messageId = MessageId.Earliest;
+
+ while (_processingQueue.TryPeek(out var processInfo))
+ {
+ if (!processInfo.IsProcessed)
+ break;
+
+ ++messagesToAcknowledge;
+
+ if (_processingQueue.TryDequeue(out processInfo))
+ {
+ messageId = processInfo.MessageId;
+ _processInfoPool.Return(processInfo);
+ }
+ }
+
+ if (messagesToAcknowledge == 1)
+ await _consumer.Acknowledge(messageId, cancellationToken).ConfigureAwait(false);
+ else if (messagesToAcknowledge > 1)
+ await _consumer.AcknowledgeCumulative(messageId, cancellationToken).ConfigureAwait(false);
+ }
+
private void StartNewProcessorTask(CancellationToken cancellationToken)
{
var processorTask = Task.Factory.StartNew(
diff --git a/tests/DotPulsar.Tests/DotPulsar.Tests.csproj b/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
index e9ae20c..69ed29a 100644
--- a/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
+++ b/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
@@ -18,7 +18,7 @@
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
- <PackageReference Include="coverlet.collector" Version="3.1.2">
+ <PackageReference Include="coverlet.collector" Version="3.2.0">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>