You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/01/06 10:03:04 UTC

[GitHub] [pulsar-dotpulsar] blankensteiner commented on a change in pull request #67: Support for (optional) ack timeout and nack delay for consumers

blankensteiner commented on a change in pull request #67:
URL: https://github.com/apache/pulsar-dotpulsar/pull/67#discussion_r552465648



##########
File path: src/DotPulsar/ConsumerOptions.cs
##########
@@ -101,5 +101,16 @@ public ConsumerOptions(string subscriptionName, string topic)
         /// Set the topic for this consumer. This is required.
         /// </summary>
         public string Topic { get; set; }
+
+        /// <summary>
+        /// Delay to wait before redelivering messages that failed to be processed.
+        /// When an application uses IConsumer.NegativeAcknowledge(Message), failed messages are redelivered after a fixed timeout.
+        /// </summary>
+        public int NegativeAckRedeliveryDelayMicros { get; set; }

Review comment:
       Let's make this a TimeSpan and call it "NegativeAcknowledgementRedeliveryDelay"

##########
File path: src/DotPulsar/Internal/Abstractions/IMessageAcksTracker.cs
##########
@@ -0,0 +1,13 @@
+namespace DotPulsar.Internal.Abstractions

Review comment:
       We need the apache header at the top of every file

##########
File path: src/DotPulsar/ConsumerOptions.cs
##########
@@ -101,5 +101,16 @@ public ConsumerOptions(string subscriptionName, string topic)
         /// Set the topic for this consumer. This is required.
         /// </summary>
         public string Topic { get; set; }
+
+        /// <summary>
+        /// Delay to wait before redelivering messages that failed to be processed.
+        /// When an application uses IConsumer.NegativeAcknowledge(Message), failed messages are redelivered after a fixed timeout.
+        /// </summary>
+        public int NegativeAckRedeliveryDelayMicros { get; set; }
+
+        /// <summary>
+        /// Timeout of unacked messages
+        /// </summary>
+        public int AckTimeoutMillis { get; set; }

Review comment:
       Let's make this a TimeSpan and call it "AcknowledgementTimeout"

##########
File path: src/DotPulsar/Internal/Abstractions/IMessageAcksTracker.cs
##########
@@ -0,0 +1,13 @@
+namespace DotPulsar.Internal.Abstractions
+{
+    using DotPulsar.Abstractions;
+    using System.Threading;
+    using System.Threading.Tasks;
+    public interface IMessageAcksTracker<T>

Review comment:
       Let's have a single space between using and class declarations.
   I'm not sure this interface is needed and why Add, Ack and Nack return T?

##########
File path: src/DotPulsar/Internal/Abstractions/IMessageAcksTracker.cs
##########
@@ -0,0 +1,13 @@
+namespace DotPulsar.Internal.Abstractions
+{
+    using DotPulsar.Abstractions;
+    using System.Threading;
+    using System.Threading.Tasks;
+    public interface IMessageAcksTracker<T>
+    {
+        T Add(T message);
+        T Ack(T message);
+        T Nack(T message);
+        Task StartTracker(IConsumer consumer, CancellationToken cancellationToken);
+    }
+}

Review comment:
       Needs a new line

##########
File path: src/DotPulsar/Internal/Abstractions/IMessageQueue.cs
##########
@@ -0,0 +1,10 @@
+namespace DotPulsar.Internal.Abstractions

Review comment:
       We need the apache header at the top of every file

##########
File path: src/DotPulsar/Internal/Abstractions/IUnackedMessageTracker.cs
##########
@@ -0,0 +1,16 @@
+namespace DotPulsar.Internal.Abstractions

Review comment:
       We need the apache header at the top of every file

##########
File path: src/DotPulsar/Internal/InactiveMessageAcksTracker.cs
##########
@@ -0,0 +1,30 @@
+namespace DotPulsar.Internal

Review comment:
       We need the apache header at the top of every file

##########
File path: src/DotPulsar/Internal/InactiveMessageAcksTracker.cs
##########
@@ -0,0 +1,30 @@
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using DotPulsar.Exceptions;
+    using Events;
+    using Microsoft.Extensions.ObjectPool;
+    using PulsarApi;
+    using System;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Runtime.CompilerServices;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using System.Diagnostics;
+
+    public sealed class InactiveMessageAcksTracker : IMessageAcksTracker<MessageId>
+    {
+        public InactiveMessageAcksTracker() { }

Review comment:
       Empty constructors can be deleted

##########
File path: src/DotPulsar/Internal/Abstractions/IMessageQueue.cs
##########
@@ -0,0 +1,10 @@
+namespace DotPulsar.Internal.Abstractions
+{
+    using System;
+
+    public interface IMessageQueue : IDequeue<MessagePackage>, IDisposable
+    {
+        void Acknowledge(MessageId obj);
+        void NegativeAcknowledge(MessageId obj);
+    }
+}

Review comment:
       Needs a new line

##########
File path: src/DotPulsar/Internal/InactiveUnackedMessageTracker.cs
##########
@@ -0,0 +1,31 @@
+namespace DotPulsar.Internal

Review comment:
       We need the apache header at the top of every file

##########
File path: src/DotPulsar/Internal/ConsumerChannel.cs
##########
@@ -108,6 +108,9 @@ public async Task Send(CommandAck command, CancellationToken cancellationToken)
             }
 
             command.ConsumerId = _id;
+
+            _queue.Acknowledge(new MessageId(messageId));

Review comment:
       Some operations need to be as fast as possible. For consumers, this is receiving and acknowledging messages. Let's work with "messageId" instead of having to create a MessageId.

##########
File path: src/DotPulsar/Internal/Abstractions/IMessageQueue.cs
##########
@@ -0,0 +1,10 @@
+namespace DotPulsar.Internal.Abstractions
+{
+    using System;
+
+    public interface IMessageQueue : IDequeue<MessagePackage>, IDisposable
+    {
+        void Acknowledge(MessageId obj);

Review comment:
       Let's call them "messageId" instead of "obj".

##########
File path: src/DotPulsar/Internal/MessageAcksTracker.cs
##########
@@ -0,0 +1,103 @@
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using DotPulsar.Exceptions;
+    using Events;
+    using Microsoft.Extensions.ObjectPool;
+    using PulsarApi;
+    using System;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Runtime.CompilerServices;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using System.Diagnostics;
+
+    internal class Tracker
+    {
+        private readonly Stopwatch _timer;
+        private int maxTimeoutMs;
+
+        public Tracker(int timeoutMs)
+        {
+            maxTimeoutMs = timeoutMs;
+            _timer = new Stopwatch();
+            _timer.Start();
+        }
+
+        public bool IsTimedOut() => _timer.ElapsedMilliseconds > maxTimeoutMs;
+
+        public long msTillTimeout => maxTimeoutMs - _timer.ElapsedMilliseconds;
+
+        public void Reset(int newTimeoutMs)
+        {
+            maxTimeoutMs = newTimeoutMs;
+            _timer.Restart();
+        }
+    }
+
+    // TODO add mechnism to stop tracker when disposed
+    public sealed class MessageAcksTracker : IMessageAcksTracker<MessageId>

Review comment:
       Every class should have its own file

##########
File path: src/DotPulsar/Internal/MessageAcksTracker.cs
##########
@@ -0,0 +1,103 @@
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using DotPulsar.Exceptions;
+    using Events;
+    using Microsoft.Extensions.ObjectPool;
+    using PulsarApi;
+    using System;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Runtime.CompilerServices;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using System.Diagnostics;
+
+    internal class Tracker
+    {
+        private readonly Stopwatch _timer;
+        private int maxTimeoutMs;

Review comment:
       Every field to start with an underscore.

##########
File path: src/DotPulsar/Internal/MessageQueue.cs
##########
@@ -0,0 +1,45 @@
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using DotPulsar.Exceptions;
+    using Events;
+    using Microsoft.Extensions.ObjectPool;
+    using PulsarApi;
+    using System;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Runtime.CompilerServices;
+    using System.Threading;
+    using System.Threading.Tasks;
+
+    public sealed class MessageQueue : IMessageQueue, IDequeue<MessagePackage>, IDisposable
+    {
+        private readonly AsyncQueue<MessagePackage> _queue;
+        private readonly IUnackedMessageTracker _tracker;
+        public MessageQueue(AsyncQueue<MessagePackage> queue, IUnackedMessageTracker tracker)
+        {
+            _queue = queue;
+            _tracker = tracker;
+        }
+        public async ValueTask<MessagePackage> Dequeue(CancellationToken cancellationToken = default)
+        {
+            var message = await _queue.Dequeue(cancellationToken).ConfigureAwait(false);
+            _tracker.Add(new MessageId(message.MessageId));
+            return message;
+        }
+        public void Acknowledge(MessageId obj) => _tracker.Ack(obj);
+
+        public void NegativeAcknowledge(MessageId obj)
+        {
+            throw new NotImplementedException();
+        }
+
+        public void Dispose()
+        {
+            _queue.Dispose();
+            _tracker.Dispose();
+        }
+        
+    }
+}

Review comment:
       Needs a new line

##########
File path: src/DotPulsar/Internal/MessageAcksTracker.cs
##########
@@ -0,0 +1,103 @@
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using DotPulsar.Exceptions;
+    using Events;
+    using Microsoft.Extensions.ObjectPool;
+    using PulsarApi;
+    using System;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Runtime.CompilerServices;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using System.Diagnostics;
+
+    internal class Tracker
+    {
+        private readonly Stopwatch _timer;
+        private int maxTimeoutMs;
+
+        public Tracker(int timeoutMs)
+        {
+            maxTimeoutMs = timeoutMs;
+            _timer = new Stopwatch();
+            _timer.Start();
+        }
+
+        public bool IsTimedOut() => _timer.ElapsedMilliseconds > maxTimeoutMs;
+
+        public long msTillTimeout => maxTimeoutMs - _timer.ElapsedMilliseconds;
+
+        public void Reset(int newTimeoutMs)
+        {
+            maxTimeoutMs = newTimeoutMs;
+            _timer.Restart();
+        }
+    }
+
+    // TODO add mechnism to stop tracker when disposed
+    public sealed class MessageAcksTracker : IMessageAcksTracker<MessageId>
+    {
+        private readonly Dictionary<MessageId, Tracker> _trackers;
+        private readonly int _unackedTimeoutMs;
+        private readonly int _nackTimeoutMs;
+        private readonly int _trackerDelayMs;
+        public MessageAcksTracker(int unackedTimeoutMs, int nackTimeoutMs, int trackerDelayMs)
+        {
+            _unackedTimeoutMs = unackedTimeoutMs;
+            _nackTimeoutMs = nackTimeoutMs;
+            _trackerDelayMs = trackerDelayMs;
+            _trackers = new Dictionary<MessageId, Tracker>();
+        }
+
+        public async Task StartTracker(IConsumer consumer, CancellationToken cancellationToken)
+        {
+            await Task.Yield();
+
+            while (true)
+            {
+                await Task.Delay(_trackerDelayMs);
+
+                var messageIds = new List<MessageId>();
+                foreach (KeyValuePair<MessageId, Tracker> p in _trackers)
+                {
+                    if (p.Value.IsTimedOut())
+                        messageIds.Add(p.Key);
+                }
+
+                if (messageIds.Count() > 0)
+                    await consumer.RedeliverUnacknowledgedMessages(messageIds, cancellationToken).ConfigureAwait(false);
+
+            }
+        }
+        public MessageId Add(MessageId message)
+        {
+            if (!_trackers.ContainsKey(message))
+            {
+                _trackers.Add(message, new Tracker(_unackedTimeoutMs));
+            }
+
+            return message;
+        }
+        public MessageId Ack(MessageId message)
+        {
+            if (_trackers.ContainsKey(message))
+                _trackers.Remove(message);
+            return message;
+        }
+        public MessageId Nack(MessageId message)
+        {
+            if (_trackers.ContainsKey(message))
+            {
+                var timer = _trackers[message];
+                if (timer.msTillTimeout > _nackTimeoutMs)
+                    timer.Reset(_nackTimeoutMs);
+            }
+            else
+                _trackers.Add(message, new Tracker(_nackTimeoutMs));
+            return message;
+        }
+    }
+}

Review comment:
       Needs a new line

##########
File path: src/DotPulsar/Internal/MessageQueue.cs
##########
@@ -0,0 +1,45 @@
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using DotPulsar.Exceptions;
+    using Events;
+    using Microsoft.Extensions.ObjectPool;
+    using PulsarApi;
+    using System;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Runtime.CompilerServices;
+    using System.Threading;
+    using System.Threading.Tasks;
+
+    public sealed class MessageQueue : IMessageQueue, IDequeue<MessagePackage>, IDisposable
+    {
+        private readonly AsyncQueue<MessagePackage> _queue;
+        private readonly IUnackedMessageTracker _tracker;
+        public MessageQueue(AsyncQueue<MessagePackage> queue, IUnackedMessageTracker tracker)
+        {
+            _queue = queue;
+            _tracker = tracker;
+        }
+        public async ValueTask<MessagePackage> Dequeue(CancellationToken cancellationToken = default)
+        {
+            var message = await _queue.Dequeue(cancellationToken).ConfigureAwait(false);
+            _tracker.Add(new MessageId(message.MessageId));
+            return message;
+        }
+        public void Acknowledge(MessageId obj) => _tracker.Ack(obj);

Review comment:
       Needs spacing

##########
File path: src/DotPulsar/Internal/MessageAcksTracker.cs
##########
@@ -0,0 +1,103 @@
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using DotPulsar.Exceptions;
+    using Events;
+    using Microsoft.Extensions.ObjectPool;
+    using PulsarApi;
+    using System;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Runtime.CompilerServices;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using System.Diagnostics;
+
+    internal class Tracker
+    {
+        private readonly Stopwatch _timer;
+        private int maxTimeoutMs;
+
+        public Tracker(int timeoutMs)
+        {
+            maxTimeoutMs = timeoutMs;
+            _timer = new Stopwatch();
+            _timer.Start();
+        }
+
+        public bool IsTimedOut() => _timer.ElapsedMilliseconds > maxTimeoutMs;
+
+        public long msTillTimeout => maxTimeoutMs - _timer.ElapsedMilliseconds;
+
+        public void Reset(int newTimeoutMs)
+        {
+            maxTimeoutMs = newTimeoutMs;
+            _timer.Restart();
+        }
+    }
+
+    // TODO add mechnism to stop tracker when disposed
+    public sealed class MessageAcksTracker : IMessageAcksTracker<MessageId>
+    {
+        private readonly Dictionary<MessageId, Tracker> _trackers;
+        private readonly int _unackedTimeoutMs;
+        private readonly int _nackTimeoutMs;
+        private readonly int _trackerDelayMs;
+        public MessageAcksTracker(int unackedTimeoutMs, int nackTimeoutMs, int trackerDelayMs)
+        {
+            _unackedTimeoutMs = unackedTimeoutMs;
+            _nackTimeoutMs = nackTimeoutMs;
+            _trackerDelayMs = trackerDelayMs;
+            _trackers = new Dictionary<MessageId, Tracker>();
+        }
+
+        public async Task StartTracker(IConsumer consumer, CancellationToken cancellationToken)
+        {
+            await Task.Yield();
+
+            while (true)
+            {
+                await Task.Delay(_trackerDelayMs);
+
+                var messageIds = new List<MessageId>();
+                foreach (KeyValuePair<MessageId, Tracker> p in _trackers)
+                {
+                    if (p.Value.IsTimedOut())
+                        messageIds.Add(p.Key);
+                }
+
+                if (messageIds.Count() > 0)
+                    await consumer.RedeliverUnacknowledgedMessages(messageIds, cancellationToken).ConfigureAwait(false);
+
+            }
+        }
+        public MessageId Add(MessageId message)
+        {
+            if (!_trackers.ContainsKey(message))
+            {

Review comment:
       No need to create brackets for a single line if-statements

##########
File path: src/DotPulsar/Internal/UnackedMessageTracker.cs
##########
@@ -0,0 +1,101 @@
+namespace DotPulsar.Internal

Review comment:
       We need the apache header at the top of every file

##########
File path: tests/DotPulsar.Tests/Internal/MessageAcksTrackerTests.cs
##########
@@ -0,0 +1,172 @@
+namespace DotPulsar.Tests.Internal
+{
+    using DotPulsar.Internal;
+    using DotPulsar.Abstractions;
+    using FluentAssertions;
+    using Xunit;
+    using System;
+    using AutoFixture;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Linq.Expressions;
+    using AutoFixture.AutoNSubstitute;
+    using NSubstitute;
+    using System.Diagnostics;
+
+    public class UnackedMessageTrackerTests
+    {
+        [Fact]
+        public void Test_Instance()

Review comment:
       I'm not sure this test makes much sense. If you call a constructor and it doesn't throw an exception, then the type returned is always what you expect.

##########
File path: src/DotPulsar/Internal/InactiveMessageAcksTracker.cs
##########
@@ -0,0 +1,30 @@
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using DotPulsar.Exceptions;
+    using Events;
+    using Microsoft.Extensions.ObjectPool;
+    using PulsarApi;
+    using System;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Runtime.CompilerServices;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using System.Diagnostics;
+
+    public sealed class InactiveMessageAcksTracker : IMessageAcksTracker<MessageId>
+    {
+        public InactiveMessageAcksTracker() { }
+
+        public async Task StartTracker(IConsumer consumer, CancellationToken cancellationToken)
+        {
+            await Task.Yield();
+        }
+
+        public MessageId Add(MessageId message) => message;
+        public MessageId Ack(MessageId message) => message;
+        public MessageId Nack(MessageId message) => message;
+    }
+}

Review comment:
       Needs a new line

##########
File path: src/DotPulsar/Internal/MessageAcksTracker.cs
##########
@@ -0,0 +1,103 @@
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using DotPulsar.Exceptions;
+    using Events;
+    using Microsoft.Extensions.ObjectPool;
+    using PulsarApi;
+    using System;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Runtime.CompilerServices;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using System.Diagnostics;
+
+    internal class Tracker
+    {
+        private readonly Stopwatch _timer;
+        private int maxTimeoutMs;
+
+        public Tracker(int timeoutMs)
+        {
+            maxTimeoutMs = timeoutMs;
+            _timer = new Stopwatch();

Review comment:
       A stopwatch can be created and started in one go: StopWatch.StartNew();

##########
File path: src/DotPulsar/Internal/InactiveUnackedMessageTracker.cs
##########
@@ -0,0 +1,31 @@
+namespace DotPulsar.Internal
+{
+    using System.Threading;

Review comment:
       Usings should be sorted alphabetically. If you run a code cleanup, this should be handled for you automatically.

##########
File path: src/DotPulsar/Internal/MessageQueue.cs
##########
@@ -0,0 +1,45 @@
+namespace DotPulsar.Internal

Review comment:
       We need the apache header at the top of every file

##########
File path: src/DotPulsar/Internal/InactiveUnackedMessageTracker.cs
##########
@@ -0,0 +1,31 @@
+namespace DotPulsar.Internal
+{
+    using System.Threading;
+    using System.Threading.Tasks;
+    using Abstractions;
+    using DotPulsar.Abstractions;
+
+    public class InactiveUnackedMessageTracker : IUnackedMessageTracker
+    {
+        public InactiveUnackedMessageTracker()
+        {
+        }
+
+        public void Ack(MessageId messageId)
+        {
+            return;
+        }
+
+        public void Add(MessageId messageId)
+        {
+            return;
+        }
+
+        public Task Start(IConsumer consumer, CancellationToken cancellationToken = default) => Task.CompletedTask;
+
+        public void Dispose() {
+            return;
+        }
+

Review comment:
       Unnecessary spacing

##########
File path: src/DotPulsar/Internal/MessageAcksTracker.cs
##########
@@ -0,0 +1,103 @@
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using DotPulsar.Exceptions;
+    using Events;
+    using Microsoft.Extensions.ObjectPool;
+    using PulsarApi;
+    using System;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Runtime.CompilerServices;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using System.Diagnostics;
+
+    internal class Tracker

Review comment:
       Everything in the "Internal" namespace is internal, so no need to mark it as internal.

##########
File path: src/DotPulsar/Internal/MessageQueue.cs
##########
@@ -0,0 +1,45 @@
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using DotPulsar.Exceptions;
+    using Events;
+    using Microsoft.Extensions.ObjectPool;
+    using PulsarApi;
+    using System;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Runtime.CompilerServices;
+    using System.Threading;
+    using System.Threading.Tasks;
+
+    public sealed class MessageQueue : IMessageQueue, IDequeue<MessagePackage>, IDisposable
+    {
+        private readonly AsyncQueue<MessagePackage> _queue;
+        private readonly IUnackedMessageTracker _tracker;
+        public MessageQueue(AsyncQueue<MessagePackage> queue, IUnackedMessageTracker tracker)

Review comment:
       Needs spacing

##########
File path: src/DotPulsar/Internal/MessageAcksTracker.cs
##########
@@ -0,0 +1,103 @@
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using DotPulsar.Exceptions;
+    using Events;
+    using Microsoft.Extensions.ObjectPool;
+    using PulsarApi;
+    using System;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Runtime.CompilerServices;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using System.Diagnostics;
+
+    internal class Tracker
+    {
+        private readonly Stopwatch _timer;
+        private int maxTimeoutMs;
+
+        public Tracker(int timeoutMs)
+        {
+            maxTimeoutMs = timeoutMs;
+            _timer = new Stopwatch();
+            _timer.Start();
+        }
+
+        public bool IsTimedOut() => _timer.ElapsedMilliseconds > maxTimeoutMs;
+
+        public long msTillTimeout => maxTimeoutMs - _timer.ElapsedMilliseconds;
+
+        public void Reset(int newTimeoutMs)
+        {
+            maxTimeoutMs = newTimeoutMs;
+            _timer.Restart();
+        }
+    }
+
+    // TODO add mechnism to stop tracker when disposed
+    public sealed class MessageAcksTracker : IMessageAcksTracker<MessageId>
+    {
+        private readonly Dictionary<MessageId, Tracker> _trackers;
+        private readonly int _unackedTimeoutMs;
+        private readonly int _nackTimeoutMs;
+        private readonly int _trackerDelayMs;
+        public MessageAcksTracker(int unackedTimeoutMs, int nackTimeoutMs, int trackerDelayMs)
+        {
+            _unackedTimeoutMs = unackedTimeoutMs;
+            _nackTimeoutMs = nackTimeoutMs;
+            _trackerDelayMs = trackerDelayMs;
+            _trackers = new Dictionary<MessageId, Tracker>();
+        }
+
+        public async Task StartTracker(IConsumer consumer, CancellationToken cancellationToken)
+        {
+            await Task.Yield();
+
+            while (true)
+            {
+                await Task.Delay(_trackerDelayMs);
+
+                var messageIds = new List<MessageId>();
+                foreach (KeyValuePair<MessageId, Tracker> p in _trackers)
+                {
+                    if (p.Value.IsTimedOut())
+                        messageIds.Add(p.Key);
+                }
+
+                if (messageIds.Count() > 0)
+                    await consumer.RedeliverUnacknowledgedMessages(messageIds, cancellationToken).ConfigureAwait(false);
+
+            }
+        }
+        public MessageId Add(MessageId message)
+        {
+            if (!_trackers.ContainsKey(message))
+            {
+                _trackers.Add(message, new Tracker(_unackedTimeoutMs));
+            }
+
+            return message;
+        }
+        public MessageId Ack(MessageId message)

Review comment:
       Let's have a single space between methods.

##########
File path: tests/DotPulsar.StressTests/DotPulsar.StressTests.csproj
##########
@@ -7,6 +7,7 @@
 
   <ItemGroup>
     <PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.8.3" />
+    <PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="5.0.0" />

Review comment:
       I think this is some Visual Code leftover?

##########
File path: src/DotPulsar/Internal/MessageAcksTracker.cs
##########
@@ -0,0 +1,103 @@
+namespace DotPulsar.Internal

Review comment:
       We need the apache header at the top of every file

##########
File path: src/DotPulsar/Internal/MessageAcksTracker.cs
##########
@@ -0,0 +1,103 @@
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using DotPulsar.Exceptions;
+    using Events;
+    using Microsoft.Extensions.ObjectPool;
+    using PulsarApi;
+    using System;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Runtime.CompilerServices;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using System.Diagnostics;
+
+    internal class Tracker
+    {
+        private readonly Stopwatch _timer;
+        private int maxTimeoutMs;
+
+        public Tracker(int timeoutMs)
+        {
+            maxTimeoutMs = timeoutMs;
+            _timer = new Stopwatch();
+            _timer.Start();
+        }
+
+        public bool IsTimedOut() => _timer.ElapsedMilliseconds > maxTimeoutMs;
+
+        public long msTillTimeout => maxTimeoutMs - _timer.ElapsedMilliseconds;
+
+        public void Reset(int newTimeoutMs)
+        {
+            maxTimeoutMs = newTimeoutMs;
+            _timer.Restart();
+        }
+    }
+
+    // TODO add mechnism to stop tracker when disposed
+    public sealed class MessageAcksTracker : IMessageAcksTracker<MessageId>
+    {
+        private readonly Dictionary<MessageId, Tracker> _trackers;
+        private readonly int _unackedTimeoutMs;
+        private readonly int _nackTimeoutMs;
+        private readonly int _trackerDelayMs;
+        public MessageAcksTracker(int unackedTimeoutMs, int nackTimeoutMs, int trackerDelayMs)

Review comment:
       Let's have a single space between the declaration of fields and constructors

##########
File path: src/DotPulsar/Internal/MessageQueue.cs
##########
@@ -0,0 +1,45 @@
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using DotPulsar.Exceptions;
+    using Events;
+    using Microsoft.Extensions.ObjectPool;
+    using PulsarApi;
+    using System;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Runtime.CompilerServices;
+    using System.Threading;
+    using System.Threading.Tasks;
+
+    public sealed class MessageQueue : IMessageQueue, IDequeue<MessagePackage>, IDisposable
+    {
+        private readonly AsyncQueue<MessagePackage> _queue;
+        private readonly IUnackedMessageTracker _tracker;
+        public MessageQueue(AsyncQueue<MessagePackage> queue, IUnackedMessageTracker tracker)
+        {
+            _queue = queue;
+            _tracker = tracker;
+        }
+        public async ValueTask<MessagePackage> Dequeue(CancellationToken cancellationToken = default)
+        {
+            var message = await _queue.Dequeue(cancellationToken).ConfigureAwait(false);
+            _tracker.Add(new MessageId(message.MessageId));
+            return message;
+        }
+        public void Acknowledge(MessageId obj) => _tracker.Ack(obj);
+
+        public void NegativeAcknowledge(MessageId obj)
+        {
+            throw new NotImplementedException();
+        }
+
+        public void Dispose()
+        {
+            _queue.Dispose();
+            _tracker.Dispose();
+        }
+        

Review comment:
       Unnecessary spacing

##########
File path: src/DotPulsar/Internal/UnackedMessageTracker.cs
##########
@@ -0,0 +1,101 @@
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using System;
+    using System.Collections.Concurrent;
+    using System.Diagnostics;
+    using System.Linq;
+    using System.Collections.Generic;
+    using System.Threading;
+    using System.Threading.Tasks;
+
+    public readonly struct AwaitingAck
+    {
+        public MessageId MessageId { get; }
+        public long Timestamp { get; }
+
+        public AwaitingAck(MessageId messageId)
+        {
+            MessageId = messageId;
+            Timestamp = Stopwatch.GetTimestamp();
+        }
+
+        public TimeSpan Elapsed => TimeSpan.FromTicks(
+            (long) ((Stopwatch.GetTimestamp() - Timestamp) / (double)Stopwatch.Frequency * TimeSpan.TicksPerSecond));
+    }
+
+    public sealed class UnackedMessageTracker : IUnackedMessageTracker
+    {
+        private readonly TimeSpan _ackTimeout;
+        private readonly TimeSpan _pollingTimeout;
+        private readonly ConcurrentQueue<AwaitingAck> _awaitingAcks;
+        private readonly List<MessageId> _acked;
+        private readonly CancellationTokenSource _cancellationTokenSource;
+
+        
+        public UnackedMessageTracker(TimeSpan ackTimeout, TimeSpan pollingTimeout)
+        {
+            _ackTimeout = ackTimeout;
+            _pollingTimeout = pollingTimeout;
+            _awaitingAcks = new ConcurrentQueue<AwaitingAck>();
+            _acked = new List<MessageId>();
+            _cancellationTokenSource = new CancellationTokenSource();
+        }
+
+        public void Add(MessageId messageId)
+        {
+            _awaitingAcks.Enqueue(new AwaitingAck(messageId));
+        }
+
+        public void Ack(MessageId messageId)
+        {
+            // We only need to store the highest cumulative ack we see (if there is one)
+            // and the MessageIds not included by that cumulative ack.
+            _acked.Add(messageId);
+        }
+
+        public Task Start(IConsumer consumer, CancellationToken cancellationToken = default)
+        {
+            CancellationToken token =
+              CancellationTokenSource.CreateLinkedTokenSource(
+                  _cancellationTokenSource.Token, cancellationToken).Token;
+
+            return Task.Run(async () => {

Review comment:
       Starting a task here does really make much sense. If you await "Start" when the result is the same no matter if you create this task or not.

##########
File path: tests/DotPulsar.Tests/Internal/MessageAcksTrackerTests.cs
##########
@@ -0,0 +1,172 @@
+namespace DotPulsar.Tests.Internal
+{
+    using DotPulsar.Internal;
+    using DotPulsar.Abstractions;
+    using FluentAssertions;
+    using Xunit;
+    using System;
+    using AutoFixture;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Linq.Expressions;
+    using AutoFixture.AutoNSubstitute;
+    using NSubstitute;
+    using System.Diagnostics;
+
+    public class UnackedMessageTrackerTests
+    {
+        [Fact]
+        public void Test_Instance()
+        {
+            var tracker = new UnackedMessageTracker(TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(1));
+            tracker.Should().BeOfType<UnackedMessageTracker>();
+        }
+
+
+        [Fact]
+        public async void Test_AwaitingAck_Elapsed()
+        {
+            //Arrange
+            var messageId = MessageId.Latest;
+            var sw = new Stopwatch();
+            sw.Start();
+
+            //Act
+            var awaiting = new AwaitingAck(messageId);
+            await Task.Delay(TimeSpan.FromMilliseconds(123));
+            sw.Stop();
+
+            //Assert
+            awaiting.Elapsed.Should().BeCloseTo(sw.Elapsed, 1);
+        }
+
+        [Fact]
+        public async void Test_Start_Message()
+        {
+            //Arrange
+            var fixture = new Fixture();
+            fixture.Customize(new AutoNSubstituteCustomization());
+            var consumer = Substitute.For<IConsumer>();
+            var messageId = MessageId.Latest;
+            var cts = new CancellationTokenSource();
+
+
+            var tracker = new UnackedMessageTracker(
+                TimeSpan.FromMilliseconds(10),
+                TimeSpan.FromMilliseconds(1));
+
+            //Act
+            tracker.Add(messageId);
+            cts.CancelAfter(20);
+            try { await tracker.Start(consumer, cts.Token); }
+            catch (TaskCanceledException) { }
+
+            //Assert
+            await consumer
+                .Received(1)
+                .RedeliverUnacknowledgedMessages(
+                    Arg.Is(EquivalentTo(new List<MessageId>() { messageId })),
+                    Arg.Any<CancellationToken>());
+        }
+
+        [Fact]
+        public async void Test_Start_Message_Ack_In_Time()
+        {
+            //Arrange
+            var fixture = new Fixture();
+            fixture.Customize(new AutoNSubstituteCustomization());
+            var consumer = Substitute.For<IConsumer>();
+            var messageId = MessageId.Latest;
+            var cts = new CancellationTokenSource();
+
+
+            var tracker = new UnackedMessageTracker(
+                TimeSpan.FromMilliseconds(10),
+                TimeSpan.FromMilliseconds(1));
+
+            //Act
+            tracker.Add(messageId);
+            cts.CancelAfter(20);
+            var _ = Task.Delay(5).ContinueWith(_ => tracker.Ack(messageId));
+            try { await tracker.Start(consumer, cts.Token); }
+            catch (TaskCanceledException) { }
+
+            //Assert
+            await consumer
+                .DidNotReceive()
+                .RedeliverUnacknowledgedMessages(
+                    Arg.Any<IEnumerable<MessageId>>(),
+                    Arg.Any<CancellationToken>());
+        }
+
+        [Fact]
+        public async void Test_Start_Message_Ack_Too_Late()
+        {
+            //Arrange
+            var fixture = new Fixture();
+            fixture.Customize(new AutoNSubstituteCustomization());
+            var consumer = Substitute.For<IConsumer>();
+            var messageId = MessageId.Latest;
+            var cts = new CancellationTokenSource();
+
+
+            var tracker = new UnackedMessageTracker(
+                TimeSpan.FromMilliseconds(10),
+                TimeSpan.FromMilliseconds(1));
+
+            //Act
+            tracker.Add(messageId);
+            cts.CancelAfter(20);
+
+            var _ = Task.Delay(15).ContinueWith(_ => tracker.Ack(messageId));
+            try { await tracker.Start(consumer, cts.Token); }
+            catch (TaskCanceledException) { }
+
+            //Assert
+            await consumer
+                .Received(1)
+                .RedeliverUnacknowledgedMessages(
+                    Arg.Any<IEnumerable<MessageId>>(),
+                    Arg.Any<CancellationToken>());
+        }
+
+        [Fact]
+        public async void Test_Start_Redeliver_Only_Cnce()
+        {
+            //Arrange
+            var fixture = new Fixture();
+            fixture.Customize(new AutoNSubstituteCustomization());
+            var consumer = Substitute.For<IConsumer>();
+            var messageId = MessageId.Latest;
+            var cts = new CancellationTokenSource();
+
+
+            var tracker = new UnackedMessageTracker(
+                TimeSpan.FromMilliseconds(10),
+                TimeSpan.FromMilliseconds(5));
+
+            //Act
+            tracker.Add(messageId);
+            cts.CancelAfter(50);
+            try { await tracker.Start(consumer, cts.Token); }
+            catch (TaskCanceledException) { }
+
+            //Assert
+            await consumer
+                .Received(1)
+                .RedeliverUnacknowledgedMessages(
+                    Arg.Any<IEnumerable<MessageId>>(),
+                    Arg.Any<CancellationToken>());
+        }
+
+
+        private Expression<Predicate<IEnumerable<T>>> EquivalentTo<T>(IEnumerable<T> enumerable) =>
+            x => IsEquivalentIEnumerable(enumerable, x);
+
+
+        private bool IsEquivalentIEnumerable<T>(IEnumerable<T> a, IEnumerable<T> b) =>
+            a.Count() == b.Count() && a.Zip(b, (a_, b_) => a_.Equals(b_)).All(_ => _);
+    }
+}

Review comment:
       Needs a new line

##########
File path: src/DotPulsar/Internal/UnackedMessageTracker.cs
##########
@@ -0,0 +1,101 @@
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using System;
+    using System.Collections.Concurrent;
+    using System.Diagnostics;
+    using System.Linq;
+    using System.Collections.Generic;
+    using System.Threading;
+    using System.Threading.Tasks;
+
+    public readonly struct AwaitingAck
+    {
+        public MessageId MessageId { get; }
+        public long Timestamp { get; }
+
+        public AwaitingAck(MessageId messageId)
+        {
+            MessageId = messageId;
+            Timestamp = Stopwatch.GetTimestamp();
+        }
+
+        public TimeSpan Elapsed => TimeSpan.FromTicks(
+            (long) ((Stopwatch.GetTimestamp() - Timestamp) / (double)Stopwatch.Frequency * TimeSpan.TicksPerSecond));
+    }
+
+    public sealed class UnackedMessageTracker : IUnackedMessageTracker
+    {
+        private readonly TimeSpan _ackTimeout;
+        private readonly TimeSpan _pollingTimeout;
+        private readonly ConcurrentQueue<AwaitingAck> _awaitingAcks;
+        private readonly List<MessageId> _acked;
+        private readonly CancellationTokenSource _cancellationTokenSource;
+

Review comment:
       Unnecessary double spacing, just need a single space.

##########
File path: tests/DotPulsar.Tests/Internal/MessageAcksTrackerTests.cs
##########
@@ -0,0 +1,172 @@
+namespace DotPulsar.Tests.Internal

Review comment:
       We need the apache header at the top of every file

##########
File path: tests/DotPulsar.Tests/Internal/MessageAcksTrackerTests.cs
##########
@@ -0,0 +1,172 @@
+namespace DotPulsar.Tests.Internal
+{
+    using DotPulsar.Internal;
+    using DotPulsar.Abstractions;
+    using FluentAssertions;
+    using Xunit;
+    using System;
+    using AutoFixture;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Linq.Expressions;
+    using AutoFixture.AutoNSubstitute;
+    using NSubstitute;
+    using System.Diagnostics;
+
+    public class UnackedMessageTrackerTests
+    {
+        [Fact]
+        public void Test_Instance()
+        {
+            var tracker = new UnackedMessageTracker(TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(1));
+            tracker.Should().BeOfType<UnackedMessageTracker>();
+        }
+
+
+        [Fact]
+        public async void Test_AwaitingAck_Elapsed()
+        {
+            //Arrange
+            var messageId = MessageId.Latest;
+            var sw = new Stopwatch();
+            sw.Start();
+
+            //Act
+            var awaiting = new AwaitingAck(messageId);
+            await Task.Delay(TimeSpan.FromMilliseconds(123));
+            sw.Stop();
+
+            //Assert
+            awaiting.Elapsed.Should().BeCloseTo(sw.Elapsed, 1);
+        }
+
+        [Fact]
+        public async void Test_Start_Message()

Review comment:
       The naming convention for tests is Method/Feature_[Given]State_[Should]ExpectedResult.
   E.g From AsyncLockTests:
   Dispose_GivenLockIsDisposedWhileItIsTaken_ShouldNotCompleteBeforeItIsReleased()

##########
File path: tests/DotPulsar.Tests/Internal/MessageAcksTrackerTests.cs
##########
@@ -0,0 +1,172 @@
+namespace DotPulsar.Tests.Internal
+{
+    using DotPulsar.Internal;
+    using DotPulsar.Abstractions;
+    using FluentAssertions;
+    using Xunit;
+    using System;
+    using AutoFixture;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Linq.Expressions;
+    using AutoFixture.AutoNSubstitute;
+    using NSubstitute;
+    using System.Diagnostics;
+
+    public class UnackedMessageTrackerTests
+    {
+        [Fact]
+        public void Test_Instance()
+        {
+            var tracker = new UnackedMessageTracker(TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(1));
+            tracker.Should().BeOfType<UnackedMessageTracker>();
+        }
+
+
+        [Fact]
+        public async void Test_AwaitingAck_Elapsed()
+        {
+            //Arrange
+            var messageId = MessageId.Latest;
+            var sw = new Stopwatch();

Review comment:
       A stopwatch can be created and started in one go: StopWatch.StartNew();

##########
File path: tests/DotPulsar.Tests/Internal/MessageAcksTrackerTests.cs
##########
@@ -0,0 +1,172 @@
+namespace DotPulsar.Tests.Internal
+{
+    using DotPulsar.Internal;
+    using DotPulsar.Abstractions;
+    using FluentAssertions;
+    using Xunit;
+    using System;
+    using AutoFixture;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Linq.Expressions;
+    using AutoFixture.AutoNSubstitute;
+    using NSubstitute;
+    using System.Diagnostics;
+
+    public class UnackedMessageTrackerTests
+    {
+        [Fact]
+        public void Test_Instance()
+        {
+            var tracker = new UnackedMessageTracker(TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(1));
+            tracker.Should().BeOfType<UnackedMessageTracker>();
+        }
+

Review comment:
       Unnecessary double spacing. Just need a single space.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org