You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/12/21 15:37:51 UTC

[GitHub] [pulsar-dotpulsar] dionjansen opened a new pull request #67: Support for (optional) ack timeout and nack delay for consumers

dionjansen opened a new pull request #67:
URL: https://github.com/apache/pulsar-dotpulsar/pull/67


   Implements: #45 , #46 
   
   ## General comments
   
   * I've added a reference to Microsoft.Bcl.AsyncInterfaces to support the IDisposable interface in net5.0. I only had this issue in VSCode for the test and sample projects. This does not influence the tests or running samples on VSC (on mac).
   * This is still a draft mostly to get the discussion going. 
   * I would like to add some more unit tests though this is a bit hard without a proper mocking framework, would you be open to introducing something like Moq to allow testing classes in a shallow way? Or do you rather have I follow a different unit test strategy?
   
   ## Discussion
   I think we can split up the discussion in 2 parts.
   
   1. The integration in the rest of the lib: where what is created, started and what is optional, general structure/ logic of the lib
   2. The implementation of the tracker: performance concerns, disposing
   
   @blankensteiner I think firstly I would like to get some thoughts on the current implementation, I mostly focussed now on setting things up so I don't break existing processes:
   
   ### Integration
   1. Right now I create a `IMessageAcksTracker` instance in the `PulsarClient`, though when clients do not have the consumer configured to use the tracking, a dummy `InactiveMessageAcksTracker` is passed. This instance is passed to the `ConsumerChannelFactory` and used when a channel is created. Does this setup makes sense to you?
   2. The `ConsumerChannel` now expects a `MessageQueue` which wraps the tracker and the `AsyncQueue`. The channel now informs the tracker through this queue when acking (and later nacking which I can add later) and the dequeue method automatically starts tracking messages received.
   2. When the tracker is started it runs indefinitely I find it hard to find a good place to start this thread, also since I need the consumer to call `RedeliverUnacknowledgedMessages`. Any ideas on how to improve this pattern? Perhaps a static method like some of the `StateMonitor` methods.
   3. I wonder how batched messages fit into all this, does a batch have a single message id? And should I be able to just redeliver that message id to release the batch back to the broker?
   
   ### Implementation
   1. The `MessageAcksTracker` uses a polling mechanism to re-check for timed out messages (either due to being unacked for too long or the nack delay has been exceeded). Is this (generally) what you were thinking about too? Alternatively I could think of an approach where polling is done through a [Timer](https://docs.microsoft.com/en-us/dotnet/api/system.threading.timer?view=net-5.0). Or we could create individual Tasks for each added message to the tracker but I'm concerned of the overhead created by this.
   2. Atm I haven't considered the scenario yet that only a nack delay is configured and not an ack timeout in which case we will not have to track all dequeued messages.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] blankensteiner commented on a change in pull request #67: Support for (optional) ack timeout and nack delay for consumers

Posted by GitBox <gi...@apache.org>.
blankensteiner commented on a change in pull request #67:
URL: https://github.com/apache/pulsar-dotpulsar/pull/67#discussion_r553298892



##########
File path: src/DotPulsar/Internal/UnackedMessageTracker.cs
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using System;
+    using System.Collections.Concurrent;
+    using System.Diagnostics;
+    using System.Linq;
+    using System.Collections.Generic;
+    using System.Threading;
+    using System.Threading.Tasks;
+
+    public readonly struct AwaitingAck
+    {
+        public MessageId MessageId { get; }
+        public long Timestamp { get; }
+
+        public AwaitingAck(MessageId messageId)
+        {
+            MessageId = messageId;
+            Timestamp = Stopwatch.GetTimestamp();
+        }
+
+        public TimeSpan Elapsed => TimeSpan.FromTicks(
+            (long) ((Stopwatch.GetTimestamp() - Timestamp) / (double) Stopwatch.Frequency * TimeSpan.TicksPerSecond));
+    }
+
+    public sealed class UnackedMessageTracker : IUnackedMessageTracker

Review comment:
       For the class and interface names, let's just the full wording: [I]UnacknowledgedMessageTracker.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] dionjansen commented on a change in pull request #67: Support for (optional) ack timeout and nack delay for consumers

Posted by GitBox <gi...@apache.org>.
dionjansen commented on a change in pull request #67:
URL: https://github.com/apache/pulsar-dotpulsar/pull/67#discussion_r552565028



##########
File path: src/DotPulsar/Internal/UnackedMessageTracker.cs
##########
@@ -0,0 +1,101 @@
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using System;
+    using System.Collections.Concurrent;
+    using System.Diagnostics;
+    using System.Linq;
+    using System.Collections.Generic;
+    using System.Threading;
+    using System.Threading.Tasks;
+
+    public readonly struct AwaitingAck
+    {
+        public MessageId MessageId { get; }
+        public long Timestamp { get; }
+
+        public AwaitingAck(MessageId messageId)
+        {
+            MessageId = messageId;
+            Timestamp = Stopwatch.GetTimestamp();
+        }
+
+        public TimeSpan Elapsed => TimeSpan.FromTicks(
+            (long) ((Stopwatch.GetTimestamp() - Timestamp) / (double)Stopwatch.Frequency * TimeSpan.TicksPerSecond));
+    }
+
+    public sealed class UnackedMessageTracker : IUnackedMessageTracker
+    {
+        private readonly TimeSpan _ackTimeout;
+        private readonly TimeSpan _pollingTimeout;
+        private readonly ConcurrentQueue<AwaitingAck> _awaitingAcks;
+        private readonly List<MessageId> _acked;
+        private readonly CancellationTokenSource _cancellationTokenSource;
+
+        
+        public UnackedMessageTracker(TimeSpan ackTimeout, TimeSpan pollingTimeout)
+        {
+            _ackTimeout = ackTimeout;
+            _pollingTimeout = pollingTimeout;
+            _awaitingAcks = new ConcurrentQueue<AwaitingAck>();
+            _acked = new List<MessageId>();
+            _cancellationTokenSource = new CancellationTokenSource();
+        }
+
+        public void Add(MessageId messageId)
+        {
+            _awaitingAcks.Enqueue(new AwaitingAck(messageId));
+        }
+
+        public void Ack(MessageId messageId)
+        {
+            // We only need to store the highest cumulative ack we see (if there is one)
+            // and the MessageIds not included by that cumulative ack.
+            _acked.Add(messageId);
+        }
+
+        public Task Start(IConsumer consumer, CancellationToken cancellationToken = default)
+        {
+            CancellationToken token =
+              CancellationTokenSource.CreateLinkedTokenSource(
+                  _cancellationTokenSource.Token, cancellationToken).Token;
+
+            return Task.Run(async () => {

Review comment:
       I'm not sure I understand. Did you mean ***doesn't** really make much sense*? Shouldn't I always create a task so that the while loop isn't blocking the caller thread?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] blankensteiner commented on pull request #67: Support for (optional) ack timeout and nack delay for consumers

Posted by GitBox <gi...@apache.org>.
blankensteiner commented on pull request #67:
URL: https://github.com/apache/pulsar-dotpulsar/pull/67#issuecomment-749551670


   Hi @dionjansen 
   Thanks for the PR!
   I'll try and answer the best I can :-)
   
   > I've added a reference to Microsoft.Bcl.AsyncInterfaces to support the IDisposable interface in net5.0. I only had this issue in VSCode for the test and sample projects. This does not influence the tests or running samples on VSC (on mac).
   
   We have tried adding support for Visual Studio Code before, but sadly it's just not a pleasant road to go down. I have no idea why Microsoft has created two IDE's for C# and doesn't ensure that they behave the same.
   Visual Studio Code will create nonsense warnings and require unnecessary changes to the code-base and therefore it's not supported for developing DotPulsar. Visual Studio Community Edition and the commercial offerings and Rider are supported, so you have to use one of those.
   
   > I would like to add some more unit tests though this is a bit hard without a proper mocking framework, would you be open to introducing something like Moq to allow testing classes in a shallow way? Or do you rather have I follow a different unit test strategy?
   
   Feel free to add one or more of these (if you need them):
   
   - AutoFixture
   - AutoFixture.AutoNSubstitute
   - AutoFixture.Xunit2
   - NSubstitute
   
   > Right now I create a IMessageAcksTracker instance in the PulsarClient, though when clients do not have the consumer configured to use the tracking, a dummy InactiveMessageAcksTracker is passed. This instance is passed to the ConsumerChannelFactory and used when a channel is created. Does this setup makes sense to you?
   
   Yes.
   
   > When the tracker is started it runs indefinitely I find it hard to find a good place to start this thread, also since I need the consumer to call RedeliverUnacknowledgedMessages. Any ideas on how to improve this pattern? Perhaps a static method like some of the StateMonitor methods.
   
   The question is if you actually need to consumer or just the consumer channel. The tracking should start and end together with the MessageQueue/ConsumerChannel.
   
   > I wonder how batched messages fit into all this, does a batch have a single message id? And should I be able to just redeliver that message id to release the batch back to the broker?
   
   That's a really good question. Bookkeeper stores batched messages as one, so if you have a batched message consisting of 5 messages and you ack 4 of them, but the last times out and you ask the broker to redeliver, you will get the entire batch again. You could keep track of this, but it will hurt performance. I don't know what the other clients do here, but maybe you could test that?
   
   > The MessageAcksTracker uses a polling mechanism to re-check for timed out messages (either due to being unacked for too long or the nack delay has been exceeded). Is this (generally) what you were thinking about too? Alternatively I could think of an approach where polling is done through a Timer. Or we could create individual Tasks for each added message to the tracker but I'm concerned of the overhead created by this.
   
   I agree that a timer/task per message will hurt performance too much. Have one task for the entire Consumer/MessageQueue is the right solution. Waking up and looking at what needs to be redelivered. Here we need to find a thread-safe and performant way of storing and accessing this information.
   
   > Atm I haven't considered the scenario yet that only a nack delay is configured and not an ack timeout in which case we will not have to track all dequeued messages.
   
   Just a boolean check on dequeue and ack? We also need to handle cumulative acknowledgment.
   
   Consider this ackTimeout implementation. First ackTimeout should be giving as a TimeSpan (instead of a an int or long as milli- or micro-seconds).
   
   When a message is dequeued, and if we have an ackTimeout, then we store the MessageId and StopWatch.GetTimestamp() in an "AwaitingAck" struct in a ConcurrencyQueue, let's call it "AwaitingAcks". Other suggestions for concurrent collections with fast insertion are welcome.
   
   When a message is acknowledged, and if we have an ackTimeout, then we store the acks instead of removing them from "AwaitingAcks". If we want to remove them right away, then we need the "AwaitingAcks" collection to support both iteration and random deletion. We only need to store the highest cumulative ack we see (if there is one) and the MessageIds not included by that cumulative ack.
   
   When the ackTracker wakes up and has calculated what ackTimeout is in StopWatch ticks (those are not the same as TimeSpan ticks). It will call StopWatch.GetTimestamp(). We will now TryPeek and Dequeue from "AwaitingAck" for as long as the tracker timestamp -  AwaitingAck.Timestamp is larger than the calculated timeout.
   If the MessageId is not acknowledged, it's added to a CommandRedeliverUnacknowledgedMessages (that we are reusing) and then send it (if MessageIds were added).
   If the MessageId was acknowledged, then we can remove that MessageId from the AckedMessageIds.
   
   Consider this nackTimeout implementation.
   
   When a message is nacked, we added the MessageId(s) to a  CommandRedeliverUnacknowledgedMessages that we are reusing (should our "RedeliverUnacknowledgedMessages" taking an enumerable of messageId actually have been called "NegativeAcknowledge"?).
   
   When the nackTracker wakes up it will check if the CommandRedeliverUnacknowledgedMessages has MessageIds and if yes, then send it.
   
   Writing such a detailed implementation description was not what I intended, but when I first get going.... :-D
   Anyway, if it is unclear, then I can try and make those classes/structs and push them to master.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] dionjansen commented on a change in pull request #67: Support for (optional) ack timeout and nack delay for consumers

Posted by GitBox <gi...@apache.org>.
dionjansen commented on a change in pull request #67:
URL: https://github.com/apache/pulsar-dotpulsar/pull/67#discussion_r552892166



##########
File path: tests/DotPulsar.Tests/Internal/MessageAcksTrackerTests.cs
##########
@@ -0,0 +1,172 @@
+namespace DotPulsar.Tests.Internal
+{
+    using DotPulsar.Internal;
+    using DotPulsar.Abstractions;
+    using FluentAssertions;
+    using Xunit;
+    using System;
+    using AutoFixture;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Linq.Expressions;
+    using AutoFixture.AutoNSubstitute;
+    using NSubstitute;
+    using System.Diagnostics;
+
+    public class UnackedMessageTrackerTests
+    {
+        [Fact]
+        public void Test_Instance()

Review comment:
       Done, removed in b57668b




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] blankensteiner commented on pull request #67: Support for (optional) ack timeout and nack delay for consumers

Posted by GitBox <gi...@apache.org>.
blankensteiner commented on pull request #67:
URL: https://github.com/apache/pulsar-dotpulsar/pull/67#issuecomment-877702569


   Hi @jbvanzuylen 
   Good question :-)
   @dionjansen when you feel the PR is ready, poke me and I'll review it again :-)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] dionjansen commented on a change in pull request #67: Support for (optional) ack timeout and nack delay for consumers

Posted by GitBox <gi...@apache.org>.
dionjansen commented on a change in pull request #67:
URL: https://github.com/apache/pulsar-dotpulsar/pull/67#discussion_r552529562



##########
File path: src/DotPulsar/Internal/InactiveMessageAcksTracker.cs
##########
@@ -0,0 +1,30 @@
+namespace DotPulsar.Internal

Review comment:
       Done 7ad89a2




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] blankensteiner commented on a change in pull request #67: Support for (optional) ack timeout and nack delay for consumers

Posted by GitBox <gi...@apache.org>.
blankensteiner commented on a change in pull request #67:
URL: https://github.com/apache/pulsar-dotpulsar/pull/67#discussion_r552588860



##########
File path: src/DotPulsar/Internal/UnackedMessageTracker.cs
##########
@@ -0,0 +1,101 @@
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using System;
+    using System.Collections.Concurrent;
+    using System.Diagnostics;
+    using System.Linq;
+    using System.Collections.Generic;
+    using System.Threading;
+    using System.Threading.Tasks;
+
+    public readonly struct AwaitingAck
+    {
+        public MessageId MessageId { get; }
+        public long Timestamp { get; }
+
+        public AwaitingAck(MessageId messageId)
+        {
+            MessageId = messageId;
+            Timestamp = Stopwatch.GetTimestamp();
+        }
+
+        public TimeSpan Elapsed => TimeSpan.FromTicks(
+            (long) ((Stopwatch.GetTimestamp() - Timestamp) / (double)Stopwatch.Frequency * TimeSpan.TicksPerSecond));
+    }
+
+    public sealed class UnackedMessageTracker : IUnackedMessageTracker
+    {
+        private readonly TimeSpan _ackTimeout;
+        private readonly TimeSpan _pollingTimeout;
+        private readonly ConcurrentQueue<AwaitingAck> _awaitingAcks;
+        private readonly List<MessageId> _acked;
+        private readonly CancellationTokenSource _cancellationTokenSource;
+
+        
+        public UnackedMessageTracker(TimeSpan ackTimeout, TimeSpan pollingTimeout)
+        {
+            _ackTimeout = ackTimeout;
+            _pollingTimeout = pollingTimeout;
+            _awaitingAcks = new ConcurrentQueue<AwaitingAck>();
+            _acked = new List<MessageId>();
+            _cancellationTokenSource = new CancellationTokenSource();
+        }
+
+        public void Add(MessageId messageId)
+        {
+            _awaitingAcks.Enqueue(new AwaitingAck(messageId));
+        }
+
+        public void Ack(MessageId messageId)
+        {
+            // We only need to store the highest cumulative ack we see (if there is one)
+            // and the MessageIds not included by that cumulative ack.
+            _acked.Add(messageId);
+        }
+
+        public Task Start(IConsumer consumer, CancellationToken cancellationToken = default)
+        {
+            CancellationToken token =
+              CancellationTokenSource.CreateLinkedTokenSource(
+                  _cancellationTokenSource.Token, cancellationToken).Token;
+
+            return Task.Run(async () => {

Review comment:
       Ah, yes, sorry, meant to say that it doesn't make much sense.
   In this case, you have an async method and if people await it, it will block no matter if you run the task or not, since you are returning the task.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] dionjansen commented on a change in pull request #67: Support for (optional) ack timeout and nack delay for consumers

Posted by GitBox <gi...@apache.org>.
dionjansen commented on a change in pull request #67:
URL: https://github.com/apache/pulsar-dotpulsar/pull/67#discussion_r552892420



##########
File path: tests/DotPulsar.Tests/Internal/MessageAcksTrackerTests.cs
##########
@@ -0,0 +1,172 @@
+namespace DotPulsar.Tests.Internal
+{
+    using DotPulsar.Internal;
+    using DotPulsar.Abstractions;
+    using FluentAssertions;
+    using Xunit;
+    using System;
+    using AutoFixture;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Linq.Expressions;
+    using AutoFixture.AutoNSubstitute;
+    using NSubstitute;
+    using System.Diagnostics;
+
+    public class UnackedMessageTrackerTests
+    {
+        [Fact]
+        public void Test_Instance()
+        {
+            var tracker = new UnackedMessageTracker(TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(1));
+            tracker.Should().BeOfType<UnackedMessageTracker>();
+        }
+
+
+        [Fact]
+        public async void Test_AwaitingAck_Elapsed()
+        {
+            //Arrange
+            var messageId = MessageId.Latest;
+            var sw = new Stopwatch();
+            sw.Start();
+
+            //Act
+            var awaiting = new AwaitingAck(messageId);
+            await Task.Delay(TimeSpan.FromMilliseconds(123));
+            sw.Stop();
+
+            //Assert
+            awaiting.Elapsed.Should().BeCloseTo(sw.Elapsed, 1);
+        }
+
+        [Fact]
+        public async void Test_Start_Message()
+        {
+            //Arrange
+            var fixture = new Fixture();
+            fixture.Customize(new AutoNSubstituteCustomization());
+            var consumer = Substitute.For<IConsumer>();
+            var messageId = MessageId.Latest;
+            var cts = new CancellationTokenSource();
+
+
+            var tracker = new UnackedMessageTracker(
+                TimeSpan.FromMilliseconds(10),
+                TimeSpan.FromMilliseconds(1));
+
+            //Act
+            tracker.Add(messageId);
+            cts.CancelAfter(20);
+            try { await tracker.Start(consumer, cts.Token); }
+            catch (TaskCanceledException) { }
+
+            //Assert
+            await consumer
+                .Received(1)
+                .RedeliverUnacknowledgedMessages(
+                    Arg.Is(EquivalentTo(new List<MessageId>() { messageId })),
+                    Arg.Any<CancellationToken>());
+        }
+
+        [Fact]
+        public async void Test_Start_Message_Ack_In_Time()
+        {
+            //Arrange
+            var fixture = new Fixture();
+            fixture.Customize(new AutoNSubstituteCustomization());
+            var consumer = Substitute.For<IConsumer>();
+            var messageId = MessageId.Latest;
+            var cts = new CancellationTokenSource();
+
+
+            var tracker = new UnackedMessageTracker(
+                TimeSpan.FromMilliseconds(10),
+                TimeSpan.FromMilliseconds(1));
+
+            //Act
+            tracker.Add(messageId);
+            cts.CancelAfter(20);
+            var _ = Task.Delay(5).ContinueWith(_ => tracker.Ack(messageId));
+            try { await tracker.Start(consumer, cts.Token); }
+            catch (TaskCanceledException) { }
+
+            //Assert
+            await consumer
+                .DidNotReceive()
+                .RedeliverUnacknowledgedMessages(
+                    Arg.Any<IEnumerable<MessageId>>(),
+                    Arg.Any<CancellationToken>());
+        }
+
+        [Fact]
+        public async void Test_Start_Message_Ack_Too_Late()
+        {
+            //Arrange
+            var fixture = new Fixture();
+            fixture.Customize(new AutoNSubstituteCustomization());
+            var consumer = Substitute.For<IConsumer>();
+            var messageId = MessageId.Latest;
+            var cts = new CancellationTokenSource();
+
+
+            var tracker = new UnackedMessageTracker(
+                TimeSpan.FromMilliseconds(10),
+                TimeSpan.FromMilliseconds(1));
+
+            //Act
+            tracker.Add(messageId);
+            cts.CancelAfter(20);
+
+            var _ = Task.Delay(15).ContinueWith(_ => tracker.Ack(messageId));
+            try { await tracker.Start(consumer, cts.Token); }
+            catch (TaskCanceledException) { }
+
+            //Assert
+            await consumer
+                .Received(1)
+                .RedeliverUnacknowledgedMessages(
+                    Arg.Any<IEnumerable<MessageId>>(),
+                    Arg.Any<CancellationToken>());
+        }
+
+        [Fact]
+        public async void Test_Start_Redeliver_Only_Cnce()
+        {
+            //Arrange
+            var fixture = new Fixture();
+            fixture.Customize(new AutoNSubstituteCustomization());
+            var consumer = Substitute.For<IConsumer>();
+            var messageId = MessageId.Latest;
+            var cts = new CancellationTokenSource();
+
+
+            var tracker = new UnackedMessageTracker(
+                TimeSpan.FromMilliseconds(10),
+                TimeSpan.FromMilliseconds(5));
+
+            //Act
+            tracker.Add(messageId);
+            cts.CancelAfter(50);
+            try { await tracker.Start(consumer, cts.Token); }
+            catch (TaskCanceledException) { }
+
+            //Assert
+            await consumer
+                .Received(1)
+                .RedeliverUnacknowledgedMessages(
+                    Arg.Any<IEnumerable<MessageId>>(),
+                    Arg.Any<CancellationToken>());
+        }
+
+
+        private Expression<Predicate<IEnumerable<T>>> EquivalentTo<T>(IEnumerable<T> enumerable) =>
+            x => IsEquivalentIEnumerable(enumerable, x);
+
+
+        private bool IsEquivalentIEnumerable<T>(IEnumerable<T> a, IEnumerable<T> b) =>
+            a.Count() == b.Count() && a.Zip(b, (a_, b_) => a_.Equals(b_)).All(_ => _);
+    }
+}

Review comment:
       Fixed in b57668b




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] dionjansen commented on a change in pull request #67: Support for (optional) ack timeout and nack delay for consumers

Posted by GitBox <gi...@apache.org>.
dionjansen commented on a change in pull request #67:
URL: https://github.com/apache/pulsar-dotpulsar/pull/67#discussion_r552555901



##########
File path: src/DotPulsar/Internal/MessageQueue.cs
##########
@@ -0,0 +1,45 @@
+namespace DotPulsar.Internal

Review comment:
       Done 7ad89a2




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] dionjansen commented on a change in pull request #67: Support for (optional) ack timeout and nack delay for consumers

Posted by GitBox <gi...@apache.org>.
dionjansen commented on a change in pull request #67:
URL: https://github.com/apache/pulsar-dotpulsar/pull/67#discussion_r558418667



##########
File path: src/DotPulsar/Internal/UnackedMessageTracker.cs
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using System;
+    using System.Collections.Concurrent;
+    using System.Diagnostics;
+    using System.Linq;
+    using System.Collections.Generic;
+    using System.Threading;
+    using System.Threading.Tasks;
+
+    public readonly struct AwaitingAck
+    {
+        public MessageId MessageId { get; }
+        public long Timestamp { get; }
+
+        public AwaitingAck(MessageId messageId)
+        {
+            MessageId = messageId;
+            Timestamp = Stopwatch.GetTimestamp();
+        }
+
+        public TimeSpan Elapsed => TimeSpan.FromTicks(
+            (long) ((Stopwatch.GetTimestamp() - Timestamp) / (double) Stopwatch.Frequency * TimeSpan.TicksPerSecond));
+    }
+
+    public sealed class UnackedMessageTracker : IUnackedMessageTracker
+    {
+        private readonly TimeSpan _ackTimeout;
+        private readonly TimeSpan _pollingTimeout;
+        private readonly ConcurrentQueue<AwaitingAck> _awaitingAcks;
+        private readonly List<MessageId> _acked;
+        private readonly CancellationTokenSource _cancellationTokenSource;
+
+        public UnackedMessageTracker(TimeSpan ackTimeout, TimeSpan pollingTimeout)
+        {
+            _ackTimeout = ackTimeout;
+            _pollingTimeout = pollingTimeout;
+            _awaitingAcks = new ConcurrentQueue<AwaitingAck>();
+            _acked = new List<MessageId>();
+            _cancellationTokenSource = new CancellationTokenSource();
+        }
+
+        public void Add(MessageId messageId)
+        {
+            _awaitingAcks.Enqueue(new AwaitingAck(messageId));
+        }
+
+        public void Ack(MessageId messageId)
+        {
+            // We only need to store the highest cumulative ack we see (if there is one)
+            // and the MessageIds not included by that cumulative ack.
+            _acked.Add(messageId);
+        }
+
+        public Task Start(IConsumer consumer, CancellationToken cancellationToken = default)
+        {
+            CancellationToken token =
+              CancellationTokenSource.CreateLinkedTokenSource(
+                  _cancellationTokenSource.Token, cancellationToken).Token;
+
+            return Task.Run(async () =>
+            {
+                while (!token.IsCancellationRequested)
+                {
+                    var messages = CheckUnackedMessages();
+
+                    if (messages.Count() > 0)
+                        await consumer.RedeliverUnacknowledgedMessages(messages, token);
+
+                    await Task.Delay(_pollingTimeout, token);
+                }
+            }, token);
+        }
+
+        private IEnumerable<MessageId> CheckUnackedMessages()
+        {
+            var result = new List<MessageId>();
+
+            while (_awaitingAcks.TryPeek(out AwaitingAck awaiting)
+                && awaiting.Elapsed > _ackTimeout)
+            {
+                if (_awaitingAcks.TryDequeue(out awaiting))
+                {
+                    if (!_acked.Contains(awaiting.MessageId))
+                        result.Add(awaiting.MessageId);
+                    else
+                        _acked.Remove(awaiting.MessageId);
+                }
+            }
+
+            return result;
+        }
+
+        public void Dispose()
+        {
+            this._cancellationTokenSource.Cancel();

Review comment:
       Done: ddfd374

##########
File path: src/DotPulsar/Internal/UnackedMessageTracker.cs
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using System;
+    using System.Collections.Concurrent;
+    using System.Diagnostics;
+    using System.Linq;
+    using System.Collections.Generic;
+    using System.Threading;
+    using System.Threading.Tasks;
+
+    public readonly struct AwaitingAck
+    {
+        public MessageId MessageId { get; }
+        public long Timestamp { get; }
+
+        public AwaitingAck(MessageId messageId)
+        {
+            MessageId = messageId;
+            Timestamp = Stopwatch.GetTimestamp();
+        }
+
+        public TimeSpan Elapsed => TimeSpan.FromTicks(
+            (long) ((Stopwatch.GetTimestamp() - Timestamp) / (double) Stopwatch.Frequency * TimeSpan.TicksPerSecond));
+    }
+
+    public sealed class UnackedMessageTracker : IUnackedMessageTracker

Review comment:
       Done: ddfd374




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] dionjansen commented on a change in pull request #67: Support for (optional) ack timeout and nack delay for consumers

Posted by GitBox <gi...@apache.org>.
dionjansen commented on a change in pull request #67:
URL: https://github.com/apache/pulsar-dotpulsar/pull/67#discussion_r552562735



##########
File path: tests/DotPulsar.StressTests/DotPulsar.StressTests.csproj
##########
@@ -7,6 +7,7 @@
 
   <ItemGroup>
     <PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.8.3" />
+    <PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="5.0.0" />

Review comment:
       ah yes done here https://github.com/apache/pulsar-dotpulsar/pull/67/commits/98850bb8a5c0e6ade58714cf6c8da00527d258db




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] dionjansen commented on a change in pull request #67: Support for (optional) ack timeout and nack delay for consumers

Posted by GitBox <gi...@apache.org>.
dionjansen commented on a change in pull request #67:
URL: https://github.com/apache/pulsar-dotpulsar/pull/67#discussion_r552529784



##########
File path: src/DotPulsar/Internal/InactiveUnackedMessageTracker.cs
##########
@@ -0,0 +1,31 @@
+namespace DotPulsar.Internal

Review comment:
       Done 7ad89a2




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] dionjansen commented on a change in pull request #67: Support for (optional) ack timeout and nack delay for consumers

Posted by GitBox <gi...@apache.org>.
dionjansen commented on a change in pull request #67:
URL: https://github.com/apache/pulsar-dotpulsar/pull/67#discussion_r552528290



##########
File path: src/DotPulsar/Internal/Abstractions/IMessageAcksTracker.cs
##########
@@ -0,0 +1,13 @@
+namespace DotPulsar.Internal.Abstractions

Review comment:
       Done https://github.com/apache/pulsar-dotpulsar/pull/67/commits/7ad89a20a97bfc9ff1770ee1ae2abdda952870f6




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] dionjansen commented on a change in pull request #67: Support for (optional) ack timeout and nack delay for consumers

Posted by GitBox <gi...@apache.org>.
dionjansen commented on a change in pull request #67:
URL: https://github.com/apache/pulsar-dotpulsar/pull/67#discussion_r552557117



##########
File path: src/DotPulsar/Internal/InactiveUnackedMessageTracker.cs
##########
@@ -0,0 +1,31 @@
+namespace DotPulsar.Internal
+{
+    using System.Threading;

Review comment:
       Done https://github.com/apache/pulsar-dotpulsar/pull/67/commits/a7d5f02704d2ab598377009bf1115794c1a6db2b




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] blankensteiner commented on a change in pull request #67: Support for (optional) ack timeout and nack delay for consumers

Posted by GitBox <gi...@apache.org>.
blankensteiner commented on a change in pull request #67:
URL: https://github.com/apache/pulsar-dotpulsar/pull/67#discussion_r553295151



##########
File path: src/DotPulsar/Internal/InactiveUnackedMessageTracker.cs
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using System.Threading;
+    using System.Threading.Tasks;
+
+    public class InactiveUnackedMessageTracker : IUnackedMessageTracker

Review comment:
       Let's make it sealed.

##########
File path: tests/DotPulsar.Tests/Internal/MessageAcksTrackerTests.cs
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Tests.Internal
+{
+    using AutoFixture;
+    using AutoFixture.AutoNSubstitute;
+    using DotPulsar.Abstractions;
+    using DotPulsar.Internal;
+    using NSubstitute;
+    using System;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Linq.Expressions;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using Xunit;
+
+    public class UnackedMessageTrackerTests
+    {
+
+        [Fact]
+        public async void Start_GivenAMessageIdIsNotAcked_ShouldRedeliver()
+        {
+            //Arrange
+            var fixture = new Fixture();
+            fixture.Customize(new AutoNSubstituteCustomization());
+            var consumer = Substitute.For<IConsumer>();
+            var messageId = MessageId.Latest;
+            var cts = new CancellationTokenSource();
+

Review comment:
       Let's remove this empty line

##########
File path: tests/DotPulsar.Tests/Internal/MessageAcksTrackerTests.cs
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Tests.Internal
+{
+    using AutoFixture;
+    using AutoFixture.AutoNSubstitute;
+    using DotPulsar.Abstractions;
+    using DotPulsar.Internal;
+    using NSubstitute;
+    using System;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Linq.Expressions;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using Xunit;
+
+    public class UnackedMessageTrackerTests
+    {
+
+        [Fact]
+        public async void Start_GivenAMessageIdIsNotAcked_ShouldRedeliver()
+        {
+            //Arrange
+            var fixture = new Fixture();
+            fixture.Customize(new AutoNSubstituteCustomization());
+            var consumer = Substitute.For<IConsumer>();
+            var messageId = MessageId.Latest;
+            var cts = new CancellationTokenSource();
+
+
+            var tracker = new UnackedMessageTracker(
+                TimeSpan.FromMilliseconds(10),
+                TimeSpan.FromMilliseconds(1));
+
+            //Act
+            tracker.Add(messageId);
+            cts.CancelAfter(20);
+            try { await tracker.Start(consumer, cts.Token); }
+            catch (TaskCanceledException) { }
+
+            //Assert
+            await consumer
+                .Received(1)
+                .RedeliverUnacknowledgedMessages(
+                    Arg.Is(EquivalentTo(new List<MessageId>() { messageId })),
+                    Arg.Any<CancellationToken>());
+        }
+
+        [Fact]
+        public async void Start_GivenAMessageIdIsAckedWithinTimeout_ShouldNotRedeliver()
+        {
+            //Arrange
+            var fixture = new Fixture();
+            fixture.Customize(new AutoNSubstituteCustomization());
+            var consumer = Substitute.For<IConsumer>();
+            var messageId = MessageId.Latest;
+            var cts = new CancellationTokenSource();
+
+
+            var tracker = new UnackedMessageTracker(
+                TimeSpan.FromMilliseconds(10),
+                TimeSpan.FromMilliseconds(1));
+
+            //Act
+            tracker.Add(messageId);
+            cts.CancelAfter(20);
+            var _ = Task.Delay(5).ContinueWith(_ => tracker.Ack(messageId));
+            try { await tracker.Start(consumer, cts.Token); }
+            catch (TaskCanceledException) { }
+
+            //Assert
+            await consumer
+                .DidNotReceive()
+                .RedeliverUnacknowledgedMessages(
+                    Arg.Any<IEnumerable<MessageId>>(),
+                    Arg.Any<CancellationToken>());
+        }
+
+        [Fact]
+        public async void Start_GivenAMessageIdIsNotAckedWithinTimeout_ShouldRedeliver()
+        {
+            //Arrange
+            var fixture = new Fixture();
+            fixture.Customize(new AutoNSubstituteCustomization());
+            var consumer = Substitute.For<IConsumer>();
+            var messageId = MessageId.Latest;
+            var cts = new CancellationTokenSource();
+

Review comment:
       Let's remove this empty line

##########
File path: src/DotPulsar/Internal/UnackedMessageTracker.cs
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using System;
+    using System.Collections.Concurrent;
+    using System.Diagnostics;
+    using System.Linq;
+    using System.Collections.Generic;
+    using System.Threading;
+    using System.Threading.Tasks;
+
+    public readonly struct AwaitingAck
+    {
+        public MessageId MessageId { get; }
+        public long Timestamp { get; }
+
+        public AwaitingAck(MessageId messageId)
+        {
+            MessageId = messageId;
+            Timestamp = Stopwatch.GetTimestamp();
+        }
+
+        public TimeSpan Elapsed => TimeSpan.FromTicks(
+            (long) ((Stopwatch.GetTimestamp() - Timestamp) / (double) Stopwatch.Frequency * TimeSpan.TicksPerSecond));
+    }
+
+    public sealed class UnackedMessageTracker : IUnackedMessageTracker

Review comment:
       For the class and interface names, let's just the full wording: [I]UnacknowledgedMessageTracner.

##########
File path: tests/DotPulsar.Tests/Internal/MessageAcksTrackerTests.cs
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Tests.Internal
+{
+    using AutoFixture;
+    using AutoFixture.AutoNSubstitute;
+    using DotPulsar.Abstractions;
+    using DotPulsar.Internal;
+    using NSubstitute;
+    using System;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Linq.Expressions;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using Xunit;
+
+    public class UnackedMessageTrackerTests
+    {
+

Review comment:
       Let's remove this empty line

##########
File path: tests/DotPulsar.Tests/Internal/MessageAcksTrackerTests.cs
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Tests.Internal
+{
+    using AutoFixture;
+    using AutoFixture.AutoNSubstitute;
+    using DotPulsar.Abstractions;
+    using DotPulsar.Internal;
+    using NSubstitute;
+    using System;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Linq.Expressions;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using Xunit;
+
+    public class UnackedMessageTrackerTests
+    {
+
+        [Fact]
+        public async void Start_GivenAMessageIdIsNotAcked_ShouldRedeliver()
+        {
+            //Arrange
+            var fixture = new Fixture();
+            fixture.Customize(new AutoNSubstituteCustomization());
+            var consumer = Substitute.For<IConsumer>();
+            var messageId = MessageId.Latest;
+            var cts = new CancellationTokenSource();
+
+
+            var tracker = new UnackedMessageTracker(
+                TimeSpan.FromMilliseconds(10),
+                TimeSpan.FromMilliseconds(1));
+
+            //Act
+            tracker.Add(messageId);
+            cts.CancelAfter(20);
+            try { await tracker.Start(consumer, cts.Token); }
+            catch (TaskCanceledException) { }
+
+            //Assert
+            await consumer
+                .Received(1)
+                .RedeliverUnacknowledgedMessages(
+                    Arg.Is(EquivalentTo(new List<MessageId>() { messageId })),
+                    Arg.Any<CancellationToken>());
+        }
+
+        [Fact]
+        public async void Start_GivenAMessageIdIsAckedWithinTimeout_ShouldNotRedeliver()
+        {
+            //Arrange
+            var fixture = new Fixture();
+            fixture.Customize(new AutoNSubstituteCustomization());
+            var consumer = Substitute.For<IConsumer>();
+            var messageId = MessageId.Latest;
+            var cts = new CancellationTokenSource();
+

Review comment:
       Let's remove this empty line

##########
File path: tests/DotPulsar.Tests/Internal/MessageAcksTrackerTests.cs
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Tests.Internal
+{
+    using AutoFixture;
+    using AutoFixture.AutoNSubstitute;
+    using DotPulsar.Abstractions;
+    using DotPulsar.Internal;
+    using NSubstitute;
+    using System;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Linq.Expressions;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using Xunit;
+
+    public class UnackedMessageTrackerTests
+    {
+
+        [Fact]
+        public async void Start_GivenAMessageIdIsNotAcked_ShouldRedeliver()
+        {
+            //Arrange
+            var fixture = new Fixture();
+            fixture.Customize(new AutoNSubstituteCustomization());
+            var consumer = Substitute.For<IConsumer>();
+            var messageId = MessageId.Latest;
+            var cts = new CancellationTokenSource();
+
+
+            var tracker = new UnackedMessageTracker(
+                TimeSpan.FromMilliseconds(10),
+                TimeSpan.FromMilliseconds(1));
+
+            //Act
+            tracker.Add(messageId);
+            cts.CancelAfter(20);
+            try { await tracker.Start(consumer, cts.Token); }
+            catch (TaskCanceledException) { }
+
+            //Assert
+            await consumer
+                .Received(1)
+                .RedeliverUnacknowledgedMessages(
+                    Arg.Is(EquivalentTo(new List<MessageId>() { messageId })),
+                    Arg.Any<CancellationToken>());
+        }
+
+        [Fact]
+        public async void Start_GivenAMessageIdIsAckedWithinTimeout_ShouldNotRedeliver()
+        {
+            //Arrange
+            var fixture = new Fixture();
+            fixture.Customize(new AutoNSubstituteCustomization());
+            var consumer = Substitute.For<IConsumer>();
+            var messageId = MessageId.Latest;
+            var cts = new CancellationTokenSource();
+
+
+            var tracker = new UnackedMessageTracker(
+                TimeSpan.FromMilliseconds(10),
+                TimeSpan.FromMilliseconds(1));
+
+            //Act
+            tracker.Add(messageId);
+            cts.CancelAfter(20);
+            var _ = Task.Delay(5).ContinueWith(_ => tracker.Ack(messageId));
+            try { await tracker.Start(consumer, cts.Token); }
+            catch (TaskCanceledException) { }
+
+            //Assert
+            await consumer
+                .DidNotReceive()
+                .RedeliverUnacknowledgedMessages(
+                    Arg.Any<IEnumerable<MessageId>>(),
+                    Arg.Any<CancellationToken>());
+        }
+
+        [Fact]
+        public async void Start_GivenAMessageIdIsNotAckedWithinTimeout_ShouldRedeliver()
+        {
+            //Arrange
+            var fixture = new Fixture();
+            fixture.Customize(new AutoNSubstituteCustomization());
+            var consumer = Substitute.For<IConsumer>();
+            var messageId = MessageId.Latest;
+            var cts = new CancellationTokenSource();
+
+
+            var tracker = new UnackedMessageTracker(
+                TimeSpan.FromMilliseconds(10),
+                TimeSpan.FromMilliseconds(1));
+
+            //Act
+            tracker.Add(messageId);
+            cts.CancelAfter(20);
+
+            var _ = Task.Delay(15).ContinueWith(_ => tracker.Ack(messageId));
+            try { await tracker.Start(consumer, cts.Token); }
+            catch (TaskCanceledException) { }
+
+            //Assert
+            await consumer
+                .Received(1)
+                .RedeliverUnacknowledgedMessages(
+                    Arg.Any<IEnumerable<MessageId>>(),
+                    Arg.Any<CancellationToken>());
+        }
+
+        [Fact]
+        public async void Start_GivenAMessageIdIsNotAckedWithinTimeout_ShouldRedeliverOnlyOnce()
+        {
+            //Arrange
+            var fixture = new Fixture();
+            fixture.Customize(new AutoNSubstituteCustomization());
+            var consumer = Substitute.For<IConsumer>();
+            var messageId = MessageId.Latest;
+            var cts = new CancellationTokenSource();
+

Review comment:
       Let's remove this empty line

##########
File path: src/DotPulsar/Internal/MessageQueue.cs
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using System;
+    using System.Threading;
+    using System.Threading.Tasks;
+
+    public sealed class MessageQueue : IMessageQueue, IDequeue<MessagePackage>, IDisposable

Review comment:
       Is "IMessageQueue" enough here since it inherits the other two interfaces?

##########
File path: src/DotPulsar/Internal/Abstractions/IUnackedMessageTracker.cs
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal.Abstractions
+{
+    using DotPulsar.Abstractions;
+    using System.Threading.Tasks;
+    using System.Threading;
+    using System;
+
+    public interface IUnackedMessageTracker : IDisposable
+    {
+        void Add(MessageId messageId);
+
+        void Ack(MessageId messageId);

Review comment:
       Let's use "Acknowledge" instead of the shorter "Ack", to be consistent with the public interfaces.

##########
File path: src/DotPulsar/Internal/UnackedMessageTracker.cs
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using System;
+    using System.Collections.Concurrent;
+    using System.Diagnostics;
+    using System.Linq;
+    using System.Collections.Generic;
+    using System.Threading;
+    using System.Threading.Tasks;
+
+    public readonly struct AwaitingAck
+    {
+        public MessageId MessageId { get; }
+        public long Timestamp { get; }
+
+        public AwaitingAck(MessageId messageId)
+        {
+            MessageId = messageId;
+            Timestamp = Stopwatch.GetTimestamp();
+        }
+
+        public TimeSpan Elapsed => TimeSpan.FromTicks(
+            (long) ((Stopwatch.GetTimestamp() - Timestamp) / (double) Stopwatch.Frequency * TimeSpan.TicksPerSecond));
+    }
+
+    public sealed class UnackedMessageTracker : IUnackedMessageTracker
+    {
+        private readonly TimeSpan _ackTimeout;
+        private readonly TimeSpan _pollingTimeout;
+        private readonly ConcurrentQueue<AwaitingAck> _awaitingAcks;
+        private readonly List<MessageId> _acked;
+        private readonly CancellationTokenSource _cancellationTokenSource;
+
+        public UnackedMessageTracker(TimeSpan ackTimeout, TimeSpan pollingTimeout)
+        {
+            _ackTimeout = ackTimeout;
+            _pollingTimeout = pollingTimeout;
+            _awaitingAcks = new ConcurrentQueue<AwaitingAck>();
+            _acked = new List<MessageId>();
+            _cancellationTokenSource = new CancellationTokenSource();
+        }
+
+        public void Add(MessageId messageId)
+        {
+            _awaitingAcks.Enqueue(new AwaitingAck(messageId));
+        }
+
+        public void Ack(MessageId messageId)
+        {
+            // We only need to store the highest cumulative ack we see (if there is one)
+            // and the MessageIds not included by that cumulative ack.
+            _acked.Add(messageId);
+        }
+
+        public Task Start(IConsumer consumer, CancellationToken cancellationToken = default)
+        {
+            CancellationToken token =
+              CancellationTokenSource.CreateLinkedTokenSource(
+                  _cancellationTokenSource.Token, cancellationToken).Token;
+
+            return Task.Run(async () =>
+            {
+                while (!token.IsCancellationRequested)
+                {
+                    var messages = CheckUnackedMessages();
+
+                    if (messages.Count() > 0)
+                        await consumer.RedeliverUnacknowledgedMessages(messages, token);
+
+                    await Task.Delay(_pollingTimeout, token);
+                }
+            }, token);
+        }
+
+        private IEnumerable<MessageId> CheckUnackedMessages()
+        {
+            var result = new List<MessageId>();
+
+            while (_awaitingAcks.TryPeek(out AwaitingAck awaiting)
+                && awaiting.Elapsed > _ackTimeout)
+            {
+                if (_awaitingAcks.TryDequeue(out awaiting))
+                {
+                    if (!_acked.Contains(awaiting.MessageId))
+                        result.Add(awaiting.MessageId);
+                    else
+                        _acked.Remove(awaiting.MessageId);
+                }
+            }
+
+            return result;
+        }
+
+        public void Dispose()
+        {
+            this._cancellationTokenSource.Cancel();

Review comment:
       No need to use "this."

##########
File path: src/DotPulsar/Internal/ConsumerChannel.cs
##########
@@ -108,6 +108,9 @@ public async Task Send(CommandAck command, CancellationToken cancellationToken)
             }
 
             command.ConsumerId = _id;
+
+            _queue.Acknowledge(new MessageId(messageId));

Review comment:
       Ah yes, we might have to implement IEquatable and IComparable on MessageIdData also then.
   When receiving an Ack, you also have to know if it is 'Individual' or 'Cumulative'.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] dionjansen commented on a change in pull request #67: Support for (optional) ack timeout and nack delay for consumers

Posted by GitBox <gi...@apache.org>.
dionjansen commented on a change in pull request #67:
URL: https://github.com/apache/pulsar-dotpulsar/pull/67#discussion_r552528802



##########
File path: src/DotPulsar/Internal/Abstractions/IMessageAcksTracker.cs
##########
@@ -0,0 +1,13 @@
+namespace DotPulsar.Internal.Abstractions
+{
+    using DotPulsar.Abstractions;
+    using System.Threading;
+    using System.Threading.Tasks;
+    public interface IMessageAcksTracker<T>

Review comment:
       cleaned up in https://github.com/apache/pulsar-dotpulsar/pull/67/commits/0119ca95e00be46184f48ab847d5353cf7e6a74f

##########
File path: src/DotPulsar/Internal/Abstractions/IMessageAcksTracker.cs
##########
@@ -0,0 +1,13 @@
+namespace DotPulsar.Internal.Abstractions
+{
+    using DotPulsar.Abstractions;
+    using System.Threading;
+    using System.Threading.Tasks;
+    public interface IMessageAcksTracker<T>
+    {
+        T Add(T message);
+        T Ack(T message);
+        T Nack(T message);
+        Task StartTracker(IConsumer consumer, CancellationToken cancellationToken);
+    }
+}

Review comment:
       cleaned up in 0119ca9




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] dionjansen commented on a change in pull request #67: Support for (optional) ack timeout and nack delay for consumers

Posted by GitBox <gi...@apache.org>.
dionjansen commented on a change in pull request #67:
URL: https://github.com/apache/pulsar-dotpulsar/pull/67#discussion_r552532114



##########
File path: src/DotPulsar/Internal/MessageAcksTracker.cs
##########
@@ -0,0 +1,103 @@
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using DotPulsar.Exceptions;
+    using Events;
+    using Microsoft.Extensions.ObjectPool;
+    using PulsarApi;
+    using System;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Runtime.CompilerServices;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using System.Diagnostics;
+
+    internal class Tracker

Review comment:
       Got it 👍 . I've cleaned up this implementation in https://github.com/apache/pulsar-dotpulsar/pull/67/commits/0119ca95e00be46184f48ab847d5353cf7e6a74f




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] dionjansen commented on a change in pull request #67: Support for (optional) ack timeout and nack delay for consumers

Posted by GitBox <gi...@apache.org>.
dionjansen commented on a change in pull request #67:
URL: https://github.com/apache/pulsar-dotpulsar/pull/67#discussion_r552531082



##########
File path: src/DotPulsar/Internal/InactiveMessageAcksTracker.cs
##########
@@ -0,0 +1,30 @@
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using DotPulsar.Exceptions;
+    using Events;
+    using Microsoft.Extensions.ObjectPool;
+    using PulsarApi;
+    using System;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Runtime.CompilerServices;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using System.Diagnostics;
+
+    public sealed class InactiveMessageAcksTracker : IMessageAcksTracker<MessageId>
+    {
+        public InactiveMessageAcksTracker() { }
+
+        public async Task StartTracker(IConsumer consumer, CancellationToken cancellationToken)
+        {
+            await Task.Yield();
+        }
+
+        public MessageId Add(MessageId message) => message;
+        public MessageId Ack(MessageId message) => message;
+        public MessageId Nack(MessageId message) => message;
+    }
+}

Review comment:
       I've cleaned up this implementation in https://github.com/apache/pulsar-dotpulsar/pull/67/commits/0119ca95e00be46184f48ab847d5353cf7e6a74f




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] dionjansen commented on a change in pull request #67: Support for (optional) ack timeout and nack delay for consumers

Posted by GitBox <gi...@apache.org>.
dionjansen commented on a change in pull request #67:
URL: https://github.com/apache/pulsar-dotpulsar/pull/67#discussion_r552532933



##########
File path: src/DotPulsar/Internal/MessageAcksTracker.cs
##########
@@ -0,0 +1,103 @@
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using DotPulsar.Exceptions;
+    using Events;
+    using Microsoft.Extensions.ObjectPool;
+    using PulsarApi;
+    using System;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Runtime.CompilerServices;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using System.Diagnostics;
+
+    internal class Tracker
+    {
+        private readonly Stopwatch _timer;
+        private int maxTimeoutMs;
+
+        public Tracker(int timeoutMs)
+        {
+            maxTimeoutMs = timeoutMs;
+            _timer = new Stopwatch();
+            _timer.Start();
+        }
+
+        public bool IsTimedOut() => _timer.ElapsedMilliseconds > maxTimeoutMs;
+
+        public long msTillTimeout => maxTimeoutMs - _timer.ElapsedMilliseconds;
+
+        public void Reset(int newTimeoutMs)
+        {
+            maxTimeoutMs = newTimeoutMs;
+            _timer.Restart();
+        }
+    }
+
+    // TODO add mechnism to stop tracker when disposed
+    public sealed class MessageAcksTracker : IMessageAcksTracker<MessageId>
+    {
+        private readonly Dictionary<MessageId, Tracker> _trackers;
+        private readonly int _unackedTimeoutMs;
+        private readonly int _nackTimeoutMs;
+        private readonly int _trackerDelayMs;
+        public MessageAcksTracker(int unackedTimeoutMs, int nackTimeoutMs, int trackerDelayMs)

Review comment:
       I've cleaned up this implementation in https://github.com/apache/pulsar-dotpulsar/pull/67/commits/0119ca95e00be46184f48ab847d5353cf7e6a74f

##########
File path: src/DotPulsar/Internal/MessageAcksTracker.cs
##########
@@ -0,0 +1,103 @@
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using DotPulsar.Exceptions;
+    using Events;
+    using Microsoft.Extensions.ObjectPool;
+    using PulsarApi;
+    using System;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Runtime.CompilerServices;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using System.Diagnostics;
+
+    internal class Tracker
+    {
+        private readonly Stopwatch _timer;
+        private int maxTimeoutMs;
+
+        public Tracker(int timeoutMs)
+        {
+            maxTimeoutMs = timeoutMs;
+            _timer = new Stopwatch();
+            _timer.Start();
+        }
+
+        public bool IsTimedOut() => _timer.ElapsedMilliseconds > maxTimeoutMs;
+
+        public long msTillTimeout => maxTimeoutMs - _timer.ElapsedMilliseconds;
+
+        public void Reset(int newTimeoutMs)
+        {
+            maxTimeoutMs = newTimeoutMs;
+            _timer.Restart();
+        }
+    }
+
+    // TODO add mechnism to stop tracker when disposed
+    public sealed class MessageAcksTracker : IMessageAcksTracker<MessageId>
+    {
+        private readonly Dictionary<MessageId, Tracker> _trackers;
+        private readonly int _unackedTimeoutMs;
+        private readonly int _nackTimeoutMs;
+        private readonly int _trackerDelayMs;
+        public MessageAcksTracker(int unackedTimeoutMs, int nackTimeoutMs, int trackerDelayMs)
+        {
+            _unackedTimeoutMs = unackedTimeoutMs;
+            _nackTimeoutMs = nackTimeoutMs;
+            _trackerDelayMs = trackerDelayMs;
+            _trackers = new Dictionary<MessageId, Tracker>();
+        }
+
+        public async Task StartTracker(IConsumer consumer, CancellationToken cancellationToken)
+        {
+            await Task.Yield();
+
+            while (true)
+            {
+                await Task.Delay(_trackerDelayMs);
+
+                var messageIds = new List<MessageId>();
+                foreach (KeyValuePair<MessageId, Tracker> p in _trackers)
+                {
+                    if (p.Value.IsTimedOut())
+                        messageIds.Add(p.Key);
+                }
+
+                if (messageIds.Count() > 0)
+                    await consumer.RedeliverUnacknowledgedMessages(messageIds, cancellationToken).ConfigureAwait(false);
+
+            }
+        }
+        public MessageId Add(MessageId message)
+        {
+            if (!_trackers.ContainsKey(message))
+            {
+                _trackers.Add(message, new Tracker(_unackedTimeoutMs));
+            }
+
+            return message;
+        }
+        public MessageId Ack(MessageId message)

Review comment:
       I've cleaned up this implementation in https://github.com/apache/pulsar-dotpulsar/pull/67/commits/0119ca95e00be46184f48ab847d5353cf7e6a74f

##########
File path: src/DotPulsar/Internal/MessageAcksTracker.cs
##########
@@ -0,0 +1,103 @@
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using DotPulsar.Exceptions;
+    using Events;
+    using Microsoft.Extensions.ObjectPool;
+    using PulsarApi;
+    using System;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Runtime.CompilerServices;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using System.Diagnostics;
+
+    internal class Tracker
+    {
+        private readonly Stopwatch _timer;
+        private int maxTimeoutMs;
+
+        public Tracker(int timeoutMs)
+        {
+            maxTimeoutMs = timeoutMs;
+            _timer = new Stopwatch();
+            _timer.Start();
+        }
+
+        public bool IsTimedOut() => _timer.ElapsedMilliseconds > maxTimeoutMs;
+
+        public long msTillTimeout => maxTimeoutMs - _timer.ElapsedMilliseconds;
+
+        public void Reset(int newTimeoutMs)
+        {
+            maxTimeoutMs = newTimeoutMs;
+            _timer.Restart();
+        }
+    }
+
+    // TODO add mechnism to stop tracker when disposed
+    public sealed class MessageAcksTracker : IMessageAcksTracker<MessageId>
+    {
+        private readonly Dictionary<MessageId, Tracker> _trackers;
+        private readonly int _unackedTimeoutMs;
+        private readonly int _nackTimeoutMs;
+        private readonly int _trackerDelayMs;
+        public MessageAcksTracker(int unackedTimeoutMs, int nackTimeoutMs, int trackerDelayMs)
+        {
+            _unackedTimeoutMs = unackedTimeoutMs;
+            _nackTimeoutMs = nackTimeoutMs;
+            _trackerDelayMs = trackerDelayMs;
+            _trackers = new Dictionary<MessageId, Tracker>();
+        }
+
+        public async Task StartTracker(IConsumer consumer, CancellationToken cancellationToken)
+        {
+            await Task.Yield();
+
+            while (true)
+            {
+                await Task.Delay(_trackerDelayMs);
+
+                var messageIds = new List<MessageId>();
+                foreach (KeyValuePair<MessageId, Tracker> p in _trackers)
+                {
+                    if (p.Value.IsTimedOut())
+                        messageIds.Add(p.Key);
+                }
+
+                if (messageIds.Count() > 0)
+                    await consumer.RedeliverUnacknowledgedMessages(messageIds, cancellationToken).ConfigureAwait(false);
+
+            }
+        }
+        public MessageId Add(MessageId message)
+        {
+            if (!_trackers.ContainsKey(message))
+            {
+                _trackers.Add(message, new Tracker(_unackedTimeoutMs));
+            }
+
+            return message;
+        }
+        public MessageId Ack(MessageId message)
+        {
+            if (_trackers.ContainsKey(message))
+                _trackers.Remove(message);
+            return message;
+        }
+        public MessageId Nack(MessageId message)
+        {
+            if (_trackers.ContainsKey(message))
+            {
+                var timer = _trackers[message];
+                if (timer.msTillTimeout > _nackTimeoutMs)
+                    timer.Reset(_nackTimeoutMs);
+            }
+            else
+                _trackers.Add(message, new Tracker(_nackTimeoutMs));
+            return message;
+        }
+    }
+}

Review comment:
       I've cleaned up this implementation in https://github.com/apache/pulsar-dotpulsar/pull/67/commits/0119ca95e00be46184f48ab847d5353cf7e6a74f




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] dionjansen commented on a change in pull request #67: Support for (optional) ack timeout and nack delay for consumers

Posted by GitBox <gi...@apache.org>.
dionjansen commented on a change in pull request #67:
URL: https://github.com/apache/pulsar-dotpulsar/pull/67#discussion_r552892332



##########
File path: tests/DotPulsar.Tests/Internal/MessageAcksTrackerTests.cs
##########
@@ -0,0 +1,172 @@
+namespace DotPulsar.Tests.Internal
+{
+    using DotPulsar.Internal;
+    using DotPulsar.Abstractions;
+    using FluentAssertions;
+    using Xunit;
+    using System;
+    using AutoFixture;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Linq.Expressions;
+    using AutoFixture.AutoNSubstitute;
+    using NSubstitute;
+    using System.Diagnostics;
+
+    public class UnackedMessageTrackerTests
+    {
+        [Fact]
+        public void Test_Instance()
+        {
+            var tracker = new UnackedMessageTracker(TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(1));
+            tracker.Should().BeOfType<UnackedMessageTracker>();
+        }
+

Review comment:
       Fixed in b57668b




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] dionjansen commented on a change in pull request #67: Support for (optional) ack timeout and nack delay for consumers

Posted by GitBox <gi...@apache.org>.
dionjansen commented on a change in pull request #67:
URL: https://github.com/apache/pulsar-dotpulsar/pull/67#discussion_r552553274



##########
File path: src/DotPulsar/Internal/InactiveUnackedMessageTracker.cs
##########
@@ -0,0 +1,31 @@
+namespace DotPulsar.Internal
+{
+    using System.Threading;

Review comment:
       Do you run code cleanup through VSC? I've used *Remove and sort using* but this orders it in the way I wrote it now.
   
   ![image](https://user-images.githubusercontent.com/5121647/103767382-cb9a6180-5020-11eb-8539-fa9df66d888a.png)
   
   But I can't find a code cleanup action in my IDE:
   
   ![image](https://user-images.githubusercontent.com/5121647/103767916-cab5ff80-5021-11eb-99a3-6fecdc340406.png)
   
   Also no mention in the [docs](https://docs.microsoft.com/en-us/visualstudio/mac/editor-behavior?view=vsmac-2019) from what I can find, are you using a different editor? For now I'll just keep the order by hand




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] dionjansen commented on a change in pull request #67: Support for (optional) ack timeout and nack delay for consumers

Posted by GitBox <gi...@apache.org>.
dionjansen commented on a change in pull request #67:
URL: https://github.com/apache/pulsar-dotpulsar/pull/67#discussion_r552892281



##########
File path: tests/DotPulsar.Tests/Internal/MessageAcksTrackerTests.cs
##########
@@ -0,0 +1,172 @@
+namespace DotPulsar.Tests.Internal
+{
+    using DotPulsar.Internal;
+    using DotPulsar.Abstractions;
+    using FluentAssertions;
+    using Xunit;
+    using System;
+    using AutoFixture;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Linq.Expressions;
+    using AutoFixture.AutoNSubstitute;
+    using NSubstitute;
+    using System.Diagnostics;
+
+    public class UnackedMessageTrackerTests
+    {
+        [Fact]
+        public void Test_Instance()
+        {
+            var tracker = new UnackedMessageTracker(TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(1));
+            tracker.Should().BeOfType<UnackedMessageTracker>();
+        }
+
+
+        [Fact]
+        public async void Test_AwaitingAck_Elapsed()
+        {
+            //Arrange
+            var messageId = MessageId.Latest;
+            var sw = new Stopwatch();

Review comment:
       Updated in b57668b




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] dionjansen commented on a change in pull request #67: Support for (optional) ack timeout and nack delay for consumers

Posted by GitBox <gi...@apache.org>.
dionjansen commented on a change in pull request #67:
URL: https://github.com/apache/pulsar-dotpulsar/pull/67#discussion_r552565957



##########
File path: src/DotPulsar/Internal/UnackedMessageTracker.cs
##########
@@ -0,0 +1,101 @@
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using System;
+    using System.Collections.Concurrent;
+    using System.Diagnostics;
+    using System.Linq;
+    using System.Collections.Generic;
+    using System.Threading;
+    using System.Threading.Tasks;
+
+    public readonly struct AwaitingAck
+    {
+        public MessageId MessageId { get; }
+        public long Timestamp { get; }
+
+        public AwaitingAck(MessageId messageId)
+        {
+            MessageId = messageId;
+            Timestamp = Stopwatch.GetTimestamp();
+        }
+
+        public TimeSpan Elapsed => TimeSpan.FromTicks(
+            (long) ((Stopwatch.GetTimestamp() - Timestamp) / (double)Stopwatch.Frequency * TimeSpan.TicksPerSecond));
+    }
+
+    public sealed class UnackedMessageTracker : IUnackedMessageTracker
+    {
+        private readonly TimeSpan _ackTimeout;
+        private readonly TimeSpan _pollingTimeout;
+        private readonly ConcurrentQueue<AwaitingAck> _awaitingAcks;
+        private readonly List<MessageId> _acked;
+        private readonly CancellationTokenSource _cancellationTokenSource;
+

Review comment:
       Done b78ac54




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] dionjansen commented on a change in pull request #67: Support for (optional) ack timeout and nack delay for consumers

Posted by GitBox <gi...@apache.org>.
dionjansen commented on a change in pull request #67:
URL: https://github.com/apache/pulsar-dotpulsar/pull/67#discussion_r552529934



##########
File path: src/DotPulsar/Internal/MessageAcksTracker.cs
##########
@@ -0,0 +1,103 @@
+namespace DotPulsar.Internal

Review comment:
       Done 7ad89a2




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] dionjansen commented on a change in pull request #67: Support for (optional) ack timeout and nack delay for consumers

Posted by GitBox <gi...@apache.org>.
dionjansen commented on a change in pull request #67:
URL: https://github.com/apache/pulsar-dotpulsar/pull/67#discussion_r552529667



##########
File path: src/DotPulsar/Internal/Abstractions/IUnackedMessageTracker.cs
##########
@@ -0,0 +1,16 @@
+namespace DotPulsar.Internal.Abstractions

Review comment:
       Done 7ad89a2




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] dionjansen commented on a change in pull request #67: Support for (optional) ack timeout and nack delay for consumers

Posted by GitBox <gi...@apache.org>.
dionjansen commented on a change in pull request #67:
URL: https://github.com/apache/pulsar-dotpulsar/pull/67#discussion_r552892033



##########
File path: tests/DotPulsar.Tests/Internal/MessageAcksTrackerTests.cs
##########
@@ -0,0 +1,172 @@
+namespace DotPulsar.Tests.Internal
+{
+    using DotPulsar.Internal;
+    using DotPulsar.Abstractions;
+    using FluentAssertions;
+    using Xunit;
+    using System;
+    using AutoFixture;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Linq.Expressions;
+    using AutoFixture.AutoNSubstitute;
+    using NSubstitute;
+    using System.Diagnostics;
+
+    public class UnackedMessageTrackerTests
+    {
+        [Fact]
+        public void Test_Instance()
+        {
+            var tracker = new UnackedMessageTracker(TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(1));
+            tracker.Should().BeOfType<UnackedMessageTracker>();
+        }
+
+
+        [Fact]
+        public async void Test_AwaitingAck_Elapsed()
+        {
+            //Arrange
+            var messageId = MessageId.Latest;
+            var sw = new Stopwatch();
+            sw.Start();
+
+            //Act
+            var awaiting = new AwaitingAck(messageId);
+            await Task.Delay(TimeSpan.FromMilliseconds(123));
+            sw.Stop();
+
+            //Assert
+            awaiting.Elapsed.Should().BeCloseTo(sw.Elapsed, 1);
+        }
+
+        [Fact]
+        public async void Test_Start_Message()

Review comment:
       Done in b57668b




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] dionjansen commented on a change in pull request #67: Support for (optional) ack timeout and nack delay for consumers

Posted by GitBox <gi...@apache.org>.
dionjansen commented on a change in pull request #67:
URL: https://github.com/apache/pulsar-dotpulsar/pull/67#discussion_r558434707



##########
File path: src/DotPulsar/Internal/ConsumerChannel.cs
##########
@@ -108,6 +108,9 @@ public async Task Send(CommandAck command, CancellationToken cancellationToken)
             }
 
             command.ConsumerId = _id;
+
+            _queue.Acknowledge(new MessageId(messageId));

Review comment:
       Ok I addressed the MessageId issue by implementing the right interfaces for comparison here 8841930, I'm not sure about the extension pattern for MessageIdData though (using partials). I refactored the use of MessageIdData in ad5827114c8d019aac86d300818b718ae8ab9d19 all tests pass.  




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] dionjansen commented on a change in pull request #67: Support for (optional) ack timeout and nack delay for consumers

Posted by GitBox <gi...@apache.org>.
dionjansen commented on a change in pull request #67:
URL: https://github.com/apache/pulsar-dotpulsar/pull/67#discussion_r552894826



##########
File path: src/DotPulsar/Internal/ConsumerChannel.cs
##########
@@ -108,6 +108,9 @@ public async Task Send(CommandAck command, CancellationToken cancellationToken)
             }
 
             command.ConsumerId = _id;
+
+            _queue.Acknowledge(new MessageId(messageId));

Review comment:
       This would result in that we'd need to start using `MessageIdData` for both the tracker as well as the `MessageQueue`, is this desirable? I thought the idea was we'd use `MessageId` for the tracker because they are easier to use (implements `IEquatable` and `IComparable`). If performance is more important I can refactor to `MessageIdData`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] blankensteiner commented on pull request #67: Support for (optional) ack timeout and nack delay for consumers

Posted by GitBox <gi...@apache.org>.
blankensteiner commented on pull request #67:
URL: https://github.com/apache/pulsar-dotpulsar/pull/67#issuecomment-755206565


   Hi @dionjansen 
   Before doing a deep dive into the implementation, I have some comments on little things that can quickly be fixed :-)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] dionjansen commented on a change in pull request #67: Support for (optional) ack timeout and nack delay for consumers

Posted by GitBox <gi...@apache.org>.
dionjansen commented on a change in pull request #67:
URL: https://github.com/apache/pulsar-dotpulsar/pull/67#discussion_r552556164



##########
File path: src/DotPulsar/Internal/UnackedMessageTracker.cs
##########
@@ -0,0 +1,101 @@
+namespace DotPulsar.Internal

Review comment:
       Done 7ad89a2




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] blankensteiner commented on a change in pull request #67: Support for (optional) ack timeout and nack delay for consumers

Posted by GitBox <gi...@apache.org>.
blankensteiner commented on a change in pull request #67:
URL: https://github.com/apache/pulsar-dotpulsar/pull/67#discussion_r552592326



##########
File path: src/DotPulsar/Internal/InactiveUnackedMessageTracker.cs
##########
@@ -0,0 +1,31 @@
+namespace DotPulsar.Internal
+{
+    using System.Threading;

Review comment:
       Ah. In my VS (Community at home and Profession at work, both on Windows) there is a "Analyze" in the toolbar with a "Clean up" submenu, allowing you to run a Code Cleanup on the solution.
   The IDE should respect the .editconfig and place usings alphabetically.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] dionjansen commented on pull request #67: Support for (optional) ack timeout and nack delay for consumers

Posted by GitBox <gi...@apache.org>.
dionjansen commented on pull request #67:
URL: https://github.com/apache/pulsar-dotpulsar/pull/67#issuecomment-885772249


   @blankensteiner I reworked the implementation from scratch based on version [1.1.2](https://github.com/apache/pulsar-dotpulsar/releases/tag/1.1.2) and opened a new PR https://github.com/apache/pulsar-dotpulsar/pull/83. I've also added support for negative acknowledgement delays next to the unacked tracking. 
   
   Closing this PR since it's no longer needed.
   
   CC: @jbvanzuylen 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] dionjansen commented on a change in pull request #67: Support for (optional) ack timeout and nack delay for consumers

Posted by GitBox <gi...@apache.org>.
dionjansen commented on a change in pull request #67:
URL: https://github.com/apache/pulsar-dotpulsar/pull/67#discussion_r552529280



##########
File path: src/DotPulsar/Internal/Abstractions/IMessageQueue.cs
##########
@@ -0,0 +1,10 @@
+namespace DotPulsar.Internal.Abstractions

Review comment:
       Done https://github.com/apache/pulsar-dotpulsar/pull/67/commits/7ad89a20a97bfc9ff1770ee1ae2abdda952870f6




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] dionjansen closed pull request #67: Support for (optional) ack timeout and nack delay for consumers

Posted by GitBox <gi...@apache.org>.
dionjansen closed pull request #67:
URL: https://github.com/apache/pulsar-dotpulsar/pull/67


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] dionjansen commented on pull request #67: Support for (optional) ack timeout and nack delay for consumers

Posted by GitBox <gi...@apache.org>.
dionjansen commented on pull request #67:
URL: https://github.com/apache/pulsar-dotpulsar/pull/67#issuecomment-753657484


   @blankensteiner I've tried to follow your suggestions and simplify the implementation a bit: I focussed just on the unacked message tracker. Perhaps it is anyway a good idea to separate these two mechanisms (nack tracking and unacked tracking) since they are configured independently of each other.
   
   > Feel free to add one or more of these (if you need them):
   > 
   > * AutoFixture
   > * AutoFixture.AutoNSubstitute
   > * AutoFixture.Xunit2
   > * NSubstitute
   > 
   
   Added `NSubstitute` and `AutoFixture.AutoNSubstitute`. I started testing the unacked tracker with this but I'm not sure I'm using the correct pattern to verify if messages are being redelivered under different test conditions. Since I don't see any other unit tests that tests internal classes in this way I'm not sure if this strategy agrees with the rest of the lib, so let me know what you think.
   
   > The question is if you actually need to consumer or just the consumer channel. The tracking should start and end together with the MessageQueue/ConsumerChannel.
   
   I am starting the thread now in the Pulsar client https://github.com/apache/pulsar-dotpulsar/blob/26cd957a56b6349f9e37bf76bd37f92a7a7e0970/src/DotPulsar/PulsarClient.cs#L81-L84
   
   The tracker is then passed to the channel factory, so it can be passed to a message queue that is passed to the channel when created. The tracker loop is stopped when disposed which occurs when the message queue is disposed, which in terms occurs when the channel is disposed. I'm still using `IConsumer` for start in order to avoid duplicate implementation of `RedeliverUnacknowledgedMessages` in the consumer.
   
   > That's a really good question. Bookkeeper stores batched messages as one, so if you have a batched message consisting of 5 messages and you ack 4 of them, but the last times out and you ask the broker to redeliver, you will get the entire batch again. You could keep track of this, but it will hurt performance. I don't know what the other clients do here, but maybe you could test that?
   
   From what I can see in the [`ConsumerImpl` in java](https://github.com/apache/pulsar/blob/6926180966f45eb9c1499b7f0eb32ea2a1368fd6/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1437-L1448) in case of batch messages only one item is put in the tracker for a batch message using `(ledgerId, entryId, partitionIndex)`. Then [when acking](https://github.com/apache/pulsar/blob/cc64889abe94d47f048e2f8e8fb10d6c37e695ec/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L602-L616) it looks like the batch is again treated as a single message, but only if `markAckForBatchMessage` returns false which indicates not all messages in the batch have been acked yet. I don't really see from this implementation how  they *"keep track"* of what to ack within the batch in this way, but there is a lot going one here that I can't make sense of. Does the adding removing mechanism I point out here to the tracker make any sense to you?
   
   > Consider this ackTimeout implementation. First ackTimeout should be giving as a TimeSpan (instead of a an int or long as milli- or micro-seconds).
   
   done, I kept the configuration options of the consumer to milliseconds though.
   
   > When a message is dequeued, and if we have an ackTimeout, then we store the MessageId and StopWatch.GetTimestamp() in an "AwaitingAck" struct in a ConcurrencyQueue, let's call it "AwaitingAcks". Other suggestions for concurrent collections with fast insertion are welcome.
   
   I was struggling a bit to create a comparable TimeSpan (since as you point out stopwatch ticks != timespan ticks). I followed [this](http://geekswithblogs.net/BlackRabbitCoder/archive/2012/01/12/c.net-little-pitfalls-stopwatch-ticks-are-not-timespan-ticks.aspx) article and concluded: https://github.com/apache/pulsar-dotpulsar/blob/f4725f5d81e8715a55ab3e4ea6791f903e9e9ad4/src/DotPulsar/Internal/UnackedMessageTracker.cs#L24-L25
   Not casting frequency explicitly to double results in considerable loss of accuracy.
   
   > When a message is acknowledged, and if we have an ackTimeout, then we store the acks instead of removing them from "AwaitingAcks". If we want to remove them right away, then we need the "AwaitingAcks" collection to support both iteration and random deletion. We only need to store the highest cumulative ack we see (if there is one) and the MessageIds not included by that cumulative ack.
   
   How can I determine if there is a highest cumulative ack from a `MessageId` instance? And if we do this wouldn't we also need some kind of removal from the unacked list that removes until this highest value, like: [removeMessagesTill](https://github.com/apache/pulsar/blob/6926180966f45eb9c1499b7f0eb32ea2a1368fd6/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java#L225)?
   
   > 
   > When the ackTracker wakes up and has calculated what ackTimeout is in StopWatch ticks (those are not the same as TimeSpan ticks). It will call StopWatch.GetTimestamp(). We will now TryPeek and Dequeue from "AwaitingAck" for as long as the tracker timestamp - AwaitingAck.Timestamp is larger than the calculated timeout.
   > If the MessageId is not acknowledged, it's added to a CommandRedeliverUnacknowledgedMessages (that we are reusing) and then send it (if MessageIds were added).
   > If the MessageId was acknowledged, then we can remove that MessageId from the AckedMessageIds.
   > 
   
   I tried to capture this in https://github.com/apache/pulsar-dotpulsar/blob/51f4a984a5819aeb91ebcd25345b495c87cc02a7/src/DotPulsar/Internal/UnackedMessageTracker.cs#L81-L91. The only thing I'm still unsure about is the accumulated acking.
   
   > Consider this nackTimeout implementation.
   > 
   > When a message is nacked, we added the MessageId(s) to a CommandRedeliverUnacknowledgedMessages that we are reusing (should our "RedeliverUnacknowledgedMessages" taking an enumerable of messageId actually have been called "NegativeAcknowledge"?).
   > 
   > When the nackTracker wakes up it will check if the CommandRedeliverUnacknowledgedMessages has MessageIds and if yes, then send it.
   
   I will implement the nack tracker if you are happy with the unacked tracker as it stands (which might also be refactored into one single tracker if two trackers are a performance concern to you).
   
   > 
   > Writing such a detailed implementation description was not what I intended, but when I first get going.... :-D
   > Anyway, if it is unclear, then I can try and make those classes/structs and push them to master.
   
   Let me know what you think, this is still a bit of a learning process for me both on the internals of this lib as well as C# / Pulsar details. Thanks in advance 👍 .
   
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] dionjansen commented on a change in pull request #67: Support for (optional) ack timeout and nack delay for consumers

Posted by GitBox <gi...@apache.org>.
dionjansen commented on a change in pull request #67:
URL: https://github.com/apache/pulsar-dotpulsar/pull/67#discussion_r558440605



##########
File path: src/DotPulsar/Internal/UnackedMessageTracker.cs
##########
@@ -0,0 +1,101 @@
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using System;
+    using System.Collections.Concurrent;
+    using System.Diagnostics;
+    using System.Linq;
+    using System.Collections.Generic;
+    using System.Threading;
+    using System.Threading.Tasks;
+
+    public readonly struct AwaitingAck
+    {
+        public MessageId MessageId { get; }
+        public long Timestamp { get; }
+
+        public AwaitingAck(MessageId messageId)
+        {
+            MessageId = messageId;
+            Timestamp = Stopwatch.GetTimestamp();
+        }
+
+        public TimeSpan Elapsed => TimeSpan.FromTicks(
+            (long) ((Stopwatch.GetTimestamp() - Timestamp) / (double)Stopwatch.Frequency * TimeSpan.TicksPerSecond));
+    }
+
+    public sealed class UnackedMessageTracker : IUnackedMessageTracker
+    {
+        private readonly TimeSpan _ackTimeout;
+        private readonly TimeSpan _pollingTimeout;
+        private readonly ConcurrentQueue<AwaitingAck> _awaitingAcks;
+        private readonly List<MessageId> _acked;
+        private readonly CancellationTokenSource _cancellationTokenSource;
+
+        
+        public UnackedMessageTracker(TimeSpan ackTimeout, TimeSpan pollingTimeout)
+        {
+            _ackTimeout = ackTimeout;
+            _pollingTimeout = pollingTimeout;
+            _awaitingAcks = new ConcurrentQueue<AwaitingAck>();
+            _acked = new List<MessageId>();
+            _cancellationTokenSource = new CancellationTokenSource();
+        }
+
+        public void Add(MessageId messageId)
+        {
+            _awaitingAcks.Enqueue(new AwaitingAck(messageId));
+        }
+
+        public void Ack(MessageId messageId)
+        {
+            // We only need to store the highest cumulative ack we see (if there is one)
+            // and the MessageIds not included by that cumulative ack.
+            _acked.Add(messageId);
+        }
+
+        public Task Start(IConsumer consumer, CancellationToken cancellationToken = default)
+        {
+            CancellationToken token =
+              CancellationTokenSource.CreateLinkedTokenSource(
+                  _cancellationTokenSource.Token, cancellationToken).Token;
+
+            return Task.Run(async () => {

Review comment:
       Ok fixed in f02166b I followed the same pattern as in StateMonitor.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] dionjansen commented on a change in pull request #67: Support for (optional) ack timeout and nack delay for consumers

Posted by GitBox <gi...@apache.org>.
dionjansen commented on a change in pull request #67:
URL: https://github.com/apache/pulsar-dotpulsar/pull/67#discussion_r552896507



##########
File path: src/DotPulsar/ConsumerOptions.cs
##########
@@ -101,5 +101,16 @@ public ConsumerOptions(string subscriptionName, string topic)
         /// Set the topic for this consumer. This is required.
         /// </summary>
         public string Topic { get; set; }
+
+        /// <summary>
+        /// Delay to wait before redelivering messages that failed to be processed.
+        /// When an application uses IConsumer.NegativeAcknowledge(Message), failed messages are redelivered after a fixed timeout.
+        /// </summary>
+        public int NegativeAckRedeliveryDelayMicros { get; set; }
+
+        /// <summary>
+        /// Timeout of unacked messages
+        /// </summary>
+        public int AckTimeoutMillis { get; set; }

Review comment:
       Refactored in 871dce6




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] dionjansen commented on pull request #67: Support for (optional) ack timeout and nack delay for consumers

Posted by GitBox <gi...@apache.org>.
dionjansen commented on pull request #67:
URL: https://github.com/apache/pulsar-dotpulsar/pull/67#issuecomment-877827074


   @jbvanzuylen @blankensteiner yes this dropped very far off my radar, unfortunately. I see quite a lot has changed since this implementation, I'll try to open a new PR from the latest version.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] dionjansen commented on pull request #67: Support for (optional) ack timeout and nack delay for consumers

Posted by GitBox <gi...@apache.org>.
dionjansen commented on pull request #67:
URL: https://github.com/apache/pulsar-dotpulsar/pull/67#issuecomment-761063358


   @blankensteiner sorry for the delay, I addressed all your remarks, let me know what you think!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] dionjansen commented on a change in pull request #67: Support for (optional) ack timeout and nack delay for consumers

Posted by GitBox <gi...@apache.org>.
dionjansen commented on a change in pull request #67:
URL: https://github.com/apache/pulsar-dotpulsar/pull/67#discussion_r552896429



##########
File path: src/DotPulsar/ConsumerOptions.cs
##########
@@ -101,5 +101,16 @@ public ConsumerOptions(string subscriptionName, string topic)
         /// Set the topic for this consumer. This is required.
         /// </summary>
         public string Topic { get; set; }
+
+        /// <summary>
+        /// Delay to wait before redelivering messages that failed to be processed.
+        /// When an application uses IConsumer.NegativeAcknowledge(Message), failed messages are redelivered after a fixed timeout.
+        /// </summary>
+        public int NegativeAckRedeliveryDelayMicros { get; set; }

Review comment:
       Refactored in 871dce6




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] dionjansen commented on a change in pull request #67: Support for (optional) ack timeout and nack delay for consumers

Posted by GitBox <gi...@apache.org>.
dionjansen commented on a change in pull request #67:
URL: https://github.com/apache/pulsar-dotpulsar/pull/67#discussion_r552560936



##########
File path: src/DotPulsar/Internal/MessageQueue.cs
##########
@@ -0,0 +1,45 @@
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using DotPulsar.Exceptions;
+    using Events;
+    using Microsoft.Extensions.ObjectPool;
+    using PulsarApi;
+    using System;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Runtime.CompilerServices;
+    using System.Threading;
+    using System.Threading.Tasks;
+
+    public sealed class MessageQueue : IMessageQueue, IDequeue<MessagePackage>, IDisposable
+    {
+        private readonly AsyncQueue<MessagePackage> _queue;
+        private readonly IUnackedMessageTracker _tracker;
+        public MessageQueue(AsyncQueue<MessagePackage> queue, IUnackedMessageTracker tracker)
+        {
+            _queue = queue;
+            _tracker = tracker;
+        }
+        public async ValueTask<MessagePackage> Dequeue(CancellationToken cancellationToken = default)
+        {
+            var message = await _queue.Dequeue(cancellationToken).ConfigureAwait(false);
+            _tracker.Add(new MessageId(message.MessageId));
+            return message;
+        }
+        public void Acknowledge(MessageId obj) => _tracker.Ack(obj);
+
+        public void NegativeAcknowledge(MessageId obj)
+        {
+            throw new NotImplementedException();
+        }
+
+        public void Dispose()
+        {
+            _queue.Dispose();
+            _tracker.Dispose();
+        }
+        

Review comment:
       Done 2edfa35

##########
File path: src/DotPulsar/Internal/MessageQueue.cs
##########
@@ -0,0 +1,45 @@
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using DotPulsar.Exceptions;
+    using Events;
+    using Microsoft.Extensions.ObjectPool;
+    using PulsarApi;
+    using System;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Runtime.CompilerServices;
+    using System.Threading;
+    using System.Threading.Tasks;
+
+    public sealed class MessageQueue : IMessageQueue, IDequeue<MessagePackage>, IDisposable
+    {
+        private readonly AsyncQueue<MessagePackage> _queue;
+        private readonly IUnackedMessageTracker _tracker;
+        public MessageQueue(AsyncQueue<MessagePackage> queue, IUnackedMessageTracker tracker)
+        {
+            _queue = queue;
+            _tracker = tracker;
+        }
+        public async ValueTask<MessagePackage> Dequeue(CancellationToken cancellationToken = default)
+        {
+            var message = await _queue.Dequeue(cancellationToken).ConfigureAwait(false);
+            _tracker.Add(new MessageId(message.MessageId));
+            return message;
+        }
+        public void Acknowledge(MessageId obj) => _tracker.Ack(obj);
+
+        public void NegativeAcknowledge(MessageId obj)
+        {
+            throw new NotImplementedException();
+        }
+
+        public void Dispose()
+        {
+            _queue.Dispose();
+            _tracker.Dispose();
+        }
+        
+    }
+}

Review comment:
       Done 2edfa35

##########
File path: src/DotPulsar/Internal/MessageQueue.cs
##########
@@ -0,0 +1,45 @@
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using DotPulsar.Exceptions;
+    using Events;
+    using Microsoft.Extensions.ObjectPool;
+    using PulsarApi;
+    using System;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Runtime.CompilerServices;
+    using System.Threading;
+    using System.Threading.Tasks;
+
+    public sealed class MessageQueue : IMessageQueue, IDequeue<MessagePackage>, IDisposable
+    {
+        private readonly AsyncQueue<MessagePackage> _queue;
+        private readonly IUnackedMessageTracker _tracker;
+        public MessageQueue(AsyncQueue<MessagePackage> queue, IUnackedMessageTracker tracker)
+        {
+            _queue = queue;
+            _tracker = tracker;
+        }
+        public async ValueTask<MessagePackage> Dequeue(CancellationToken cancellationToken = default)
+        {
+            var message = await _queue.Dequeue(cancellationToken).ConfigureAwait(false);
+            _tracker.Add(new MessageId(message.MessageId));
+            return message;
+        }
+        public void Acknowledge(MessageId obj) => _tracker.Ack(obj);

Review comment:
       Done 2edfa35

##########
File path: src/DotPulsar/Internal/MessageQueue.cs
##########
@@ -0,0 +1,45 @@
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using DotPulsar.Exceptions;
+    using Events;
+    using Microsoft.Extensions.ObjectPool;
+    using PulsarApi;
+    using System;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Runtime.CompilerServices;
+    using System.Threading;
+    using System.Threading.Tasks;
+
+    public sealed class MessageQueue : IMessageQueue, IDequeue<MessagePackage>, IDisposable
+    {
+        private readonly AsyncQueue<MessagePackage> _queue;
+        private readonly IUnackedMessageTracker _tracker;
+        public MessageQueue(AsyncQueue<MessagePackage> queue, IUnackedMessageTracker tracker)

Review comment:
       Done 2edfa35




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] dionjansen commented on a change in pull request #67: Support for (optional) ack timeout and nack delay for consumers

Posted by GitBox <gi...@apache.org>.
dionjansen commented on a change in pull request #67:
URL: https://github.com/apache/pulsar-dotpulsar/pull/67#discussion_r552553628



##########
File path: src/DotPulsar/Internal/InactiveUnackedMessageTracker.cs
##########
@@ -0,0 +1,31 @@
+namespace DotPulsar.Internal
+{
+    using System.Threading;
+    using System.Threading.Tasks;
+    using Abstractions;
+    using DotPulsar.Abstractions;
+
+    public class InactiveUnackedMessageTracker : IUnackedMessageTracker
+    {
+        public InactiveUnackedMessageTracker()
+        {
+        }
+
+        public void Ack(MessageId messageId)
+        {
+            return;
+        }
+
+        public void Add(MessageId messageId)
+        {
+            return;
+        }
+
+        public Task Start(IConsumer consumer, CancellationToken cancellationToken = default) => Task.CompletedTask;
+
+        public void Dispose() {
+            return;
+        }
+

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] dionjansen commented on a change in pull request #67: Support for (optional) ack timeout and nack delay for consumers

Posted by GitBox <gi...@apache.org>.
dionjansen commented on a change in pull request #67:
URL: https://github.com/apache/pulsar-dotpulsar/pull/67#discussion_r552561276



##########
File path: src/DotPulsar/Internal/Abstractions/IMessageQueue.cs
##########
@@ -0,0 +1,10 @@
+namespace DotPulsar.Internal.Abstractions
+{
+    using System;
+
+    public interface IMessageQueue : IDequeue<MessagePackage>, IDisposable
+    {
+        void Acknowledge(MessageId obj);

Review comment:
       Done 2edfa35

##########
File path: src/DotPulsar/Internal/Abstractions/IMessageQueue.cs
##########
@@ -0,0 +1,10 @@
+namespace DotPulsar.Internal.Abstractions
+{
+    using System;
+
+    public interface IMessageQueue : IDequeue<MessagePackage>, IDisposable
+    {
+        void Acknowledge(MessageId obj);
+        void NegativeAcknowledge(MessageId obj);
+    }
+}

Review comment:
       Done 2edfa35




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] dionjansen commented on a change in pull request #67: Support for (optional) ack timeout and nack delay for consumers

Posted by GitBox <gi...@apache.org>.
dionjansen commented on a change in pull request #67:
URL: https://github.com/apache/pulsar-dotpulsar/pull/67#discussion_r558433871



##########
File path: src/DotPulsar/Internal/InactiveUnackedMessageTracker.cs
##########
@@ -0,0 +1,31 @@
+namespace DotPulsar.Internal
+{
+    using System.Threading;

Review comment:
       Ok, I clearly don't have that in my version, strange I'll have a look to see how to configure my editor to respect .editconfig




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] dionjansen commented on a change in pull request #67: Support for (optional) ack timeout and nack delay for consumers

Posted by GitBox <gi...@apache.org>.
dionjansen commented on a change in pull request #67:
URL: https://github.com/apache/pulsar-dotpulsar/pull/67#discussion_r558418524



##########
File path: src/DotPulsar/Internal/MessageQueue.cs
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using System;
+    using System.Threading;
+    using System.Threading.Tasks;
+
+    public sealed class MessageQueue : IMessageQueue, IDequeue<MessagePackage>, IDisposable

Review comment:
       Done: ddfd374

##########
File path: src/DotPulsar/Internal/Abstractions/IUnackedMessageTracker.cs
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal.Abstractions
+{
+    using DotPulsar.Abstractions;
+    using System.Threading.Tasks;
+    using System.Threading;
+    using System;
+
+    public interface IUnackedMessageTracker : IDisposable
+    {
+        void Add(MessageId messageId);
+
+        void Ack(MessageId messageId);

Review comment:
       Done: ddfd374




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] dionjansen commented on a change in pull request #67: Support for (optional) ack timeout and nack delay for consumers

Posted by GitBox <gi...@apache.org>.
dionjansen commented on a change in pull request #67:
URL: https://github.com/apache/pulsar-dotpulsar/pull/67#discussion_r552532271



##########
File path: src/DotPulsar/Internal/MessageAcksTracker.cs
##########
@@ -0,0 +1,103 @@
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using DotPulsar.Exceptions;
+    using Events;
+    using Microsoft.Extensions.ObjectPool;
+    using PulsarApi;
+    using System;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Runtime.CompilerServices;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using System.Diagnostics;
+
+    internal class Tracker
+    {
+        private readonly Stopwatch _timer;
+        private int maxTimeoutMs;

Review comment:
       I've cleaned up this implementation in https://github.com/apache/pulsar-dotpulsar/pull/67/commits/0119ca95e00be46184f48ab847d5353cf7e6a74f

##########
File path: src/DotPulsar/Internal/MessageAcksTracker.cs
##########
@@ -0,0 +1,103 @@
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using DotPulsar.Exceptions;
+    using Events;
+    using Microsoft.Extensions.ObjectPool;
+    using PulsarApi;
+    using System;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Runtime.CompilerServices;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using System.Diagnostics;
+
+    internal class Tracker
+    {
+        private readonly Stopwatch _timer;
+        private int maxTimeoutMs;
+
+        public Tracker(int timeoutMs)
+        {
+            maxTimeoutMs = timeoutMs;
+            _timer = new Stopwatch();

Review comment:
       I've cleaned up this implementation in https://github.com/apache/pulsar-dotpulsar/pull/67/commits/0119ca95e00be46184f48ab847d5353cf7e6a74f

##########
File path: src/DotPulsar/Internal/MessageAcksTracker.cs
##########
@@ -0,0 +1,103 @@
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using DotPulsar.Exceptions;
+    using Events;
+    using Microsoft.Extensions.ObjectPool;
+    using PulsarApi;
+    using System;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Runtime.CompilerServices;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using System.Diagnostics;
+
+    internal class Tracker
+    {
+        private readonly Stopwatch _timer;
+        private int maxTimeoutMs;
+
+        public Tracker(int timeoutMs)
+        {
+            maxTimeoutMs = timeoutMs;
+            _timer = new Stopwatch();
+            _timer.Start();
+        }
+
+        public bool IsTimedOut() => _timer.ElapsedMilliseconds > maxTimeoutMs;
+
+        public long msTillTimeout => maxTimeoutMs - _timer.ElapsedMilliseconds;
+
+        public void Reset(int newTimeoutMs)
+        {
+            maxTimeoutMs = newTimeoutMs;
+            _timer.Restart();
+        }
+    }
+
+    // TODO add mechnism to stop tracker when disposed
+    public sealed class MessageAcksTracker : IMessageAcksTracker<MessageId>

Review comment:
       I've cleaned up this implementation in https://github.com/apache/pulsar-dotpulsar/pull/67/commits/0119ca95e00be46184f48ab847d5353cf7e6a74f

##########
File path: src/DotPulsar/Internal/MessageAcksTracker.cs
##########
@@ -0,0 +1,103 @@
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using DotPulsar.Exceptions;
+    using Events;
+    using Microsoft.Extensions.ObjectPool;
+    using PulsarApi;
+    using System;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Runtime.CompilerServices;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using System.Diagnostics;
+
+    internal class Tracker
+    {
+        private readonly Stopwatch _timer;
+        private int maxTimeoutMs;
+
+        public Tracker(int timeoutMs)
+        {
+            maxTimeoutMs = timeoutMs;
+            _timer = new Stopwatch();
+            _timer.Start();
+        }
+
+        public bool IsTimedOut() => _timer.ElapsedMilliseconds > maxTimeoutMs;
+
+        public long msTillTimeout => maxTimeoutMs - _timer.ElapsedMilliseconds;
+
+        public void Reset(int newTimeoutMs)
+        {
+            maxTimeoutMs = newTimeoutMs;
+            _timer.Restart();
+        }
+    }
+
+    // TODO add mechnism to stop tracker when disposed
+    public sealed class MessageAcksTracker : IMessageAcksTracker<MessageId>
+    {
+        private readonly Dictionary<MessageId, Tracker> _trackers;
+        private readonly int _unackedTimeoutMs;
+        private readonly int _nackTimeoutMs;
+        private readonly int _trackerDelayMs;
+        public MessageAcksTracker(int unackedTimeoutMs, int nackTimeoutMs, int trackerDelayMs)
+        {
+            _unackedTimeoutMs = unackedTimeoutMs;
+            _nackTimeoutMs = nackTimeoutMs;
+            _trackerDelayMs = trackerDelayMs;
+            _trackers = new Dictionary<MessageId, Tracker>();
+        }
+
+        public async Task StartTracker(IConsumer consumer, CancellationToken cancellationToken)
+        {
+            await Task.Yield();
+
+            while (true)
+            {
+                await Task.Delay(_trackerDelayMs);
+
+                var messageIds = new List<MessageId>();
+                foreach (KeyValuePair<MessageId, Tracker> p in _trackers)
+                {
+                    if (p.Value.IsTimedOut())
+                        messageIds.Add(p.Key);
+                }
+
+                if (messageIds.Count() > 0)
+                    await consumer.RedeliverUnacknowledgedMessages(messageIds, cancellationToken).ConfigureAwait(false);
+
+            }
+        }
+        public MessageId Add(MessageId message)
+        {
+            if (!_trackers.ContainsKey(message))
+            {

Review comment:
       I've cleaned up this implementation in https://github.com/apache/pulsar-dotpulsar/pull/67/commits/0119ca95e00be46184f48ab847d5353cf7e6a74f




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] dionjansen commented on a change in pull request #67: Support for (optional) ack timeout and nack delay for consumers

Posted by GitBox <gi...@apache.org>.
dionjansen commented on a change in pull request #67:
URL: https://github.com/apache/pulsar-dotpulsar/pull/67#discussion_r558418794



##########
File path: tests/DotPulsar.Tests/Internal/MessageAcksTrackerTests.cs
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Tests.Internal
+{
+    using AutoFixture;
+    using AutoFixture.AutoNSubstitute;
+    using DotPulsar.Abstractions;
+    using DotPulsar.Internal;
+    using NSubstitute;
+    using System;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Linq.Expressions;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using Xunit;
+
+    public class UnackedMessageTrackerTests
+    {
+
+        [Fact]
+        public async void Start_GivenAMessageIdIsNotAcked_ShouldRedeliver()
+        {
+            //Arrange
+            var fixture = new Fixture();
+            fixture.Customize(new AutoNSubstituteCustomization());
+            var consumer = Substitute.For<IConsumer>();
+            var messageId = MessageId.Latest;
+            var cts = new CancellationTokenSource();
+
+
+            var tracker = new UnackedMessageTracker(
+                TimeSpan.FromMilliseconds(10),
+                TimeSpan.FromMilliseconds(1));
+
+            //Act
+            tracker.Add(messageId);
+            cts.CancelAfter(20);
+            try { await tracker.Start(consumer, cts.Token); }
+            catch (TaskCanceledException) { }
+
+            //Assert
+            await consumer
+                .Received(1)
+                .RedeliverUnacknowledgedMessages(
+                    Arg.Is(EquivalentTo(new List<MessageId>() { messageId })),
+                    Arg.Any<CancellationToken>());
+        }
+
+        [Fact]
+        public async void Start_GivenAMessageIdIsAckedWithinTimeout_ShouldNotRedeliver()
+        {
+            //Arrange
+            var fixture = new Fixture();
+            fixture.Customize(new AutoNSubstituteCustomization());
+            var consumer = Substitute.For<IConsumer>();
+            var messageId = MessageId.Latest;
+            var cts = new CancellationTokenSource();
+
+
+            var tracker = new UnackedMessageTracker(
+                TimeSpan.FromMilliseconds(10),
+                TimeSpan.FromMilliseconds(1));
+
+            //Act
+            tracker.Add(messageId);
+            cts.CancelAfter(20);
+            var _ = Task.Delay(5).ContinueWith(_ => tracker.Ack(messageId));
+            try { await tracker.Start(consumer, cts.Token); }
+            catch (TaskCanceledException) { }
+
+            //Assert
+            await consumer
+                .DidNotReceive()
+                .RedeliverUnacknowledgedMessages(
+                    Arg.Any<IEnumerable<MessageId>>(),
+                    Arg.Any<CancellationToken>());
+        }
+
+        [Fact]
+        public async void Start_GivenAMessageIdIsNotAckedWithinTimeout_ShouldRedeliver()
+        {
+            //Arrange
+            var fixture = new Fixture();
+            fixture.Customize(new AutoNSubstituteCustomization());
+            var consumer = Substitute.For<IConsumer>();
+            var messageId = MessageId.Latest;
+            var cts = new CancellationTokenSource();
+
+
+            var tracker = new UnackedMessageTracker(
+                TimeSpan.FromMilliseconds(10),
+                TimeSpan.FromMilliseconds(1));
+
+            //Act
+            tracker.Add(messageId);
+            cts.CancelAfter(20);
+
+            var _ = Task.Delay(15).ContinueWith(_ => tracker.Ack(messageId));
+            try { await tracker.Start(consumer, cts.Token); }
+            catch (TaskCanceledException) { }
+
+            //Assert
+            await consumer
+                .Received(1)
+                .RedeliverUnacknowledgedMessages(
+                    Arg.Any<IEnumerable<MessageId>>(),
+                    Arg.Any<CancellationToken>());
+        }
+
+        [Fact]
+        public async void Start_GivenAMessageIdIsNotAckedWithinTimeout_ShouldRedeliverOnlyOnce()
+        {
+            //Arrange
+            var fixture = new Fixture();
+            fixture.Customize(new AutoNSubstituteCustomization());
+            var consumer = Substitute.For<IConsumer>();
+            var messageId = MessageId.Latest;
+            var cts = new CancellationTokenSource();
+

Review comment:
       Done: ddfd374

##########
File path: tests/DotPulsar.Tests/Internal/MessageAcksTrackerTests.cs
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Tests.Internal
+{
+    using AutoFixture;
+    using AutoFixture.AutoNSubstitute;
+    using DotPulsar.Abstractions;
+    using DotPulsar.Internal;
+    using NSubstitute;
+    using System;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Linq.Expressions;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using Xunit;
+
+    public class UnackedMessageTrackerTests
+    {
+
+        [Fact]
+        public async void Start_GivenAMessageIdIsNotAcked_ShouldRedeliver()
+        {
+            //Arrange
+            var fixture = new Fixture();
+            fixture.Customize(new AutoNSubstituteCustomization());
+            var consumer = Substitute.For<IConsumer>();
+            var messageId = MessageId.Latest;
+            var cts = new CancellationTokenSource();
+
+
+            var tracker = new UnackedMessageTracker(
+                TimeSpan.FromMilliseconds(10),
+                TimeSpan.FromMilliseconds(1));
+
+            //Act
+            tracker.Add(messageId);
+            cts.CancelAfter(20);
+            try { await tracker.Start(consumer, cts.Token); }
+            catch (TaskCanceledException) { }
+
+            //Assert
+            await consumer
+                .Received(1)
+                .RedeliverUnacknowledgedMessages(
+                    Arg.Is(EquivalentTo(new List<MessageId>() { messageId })),
+                    Arg.Any<CancellationToken>());
+        }
+
+        [Fact]
+        public async void Start_GivenAMessageIdIsAckedWithinTimeout_ShouldNotRedeliver()
+        {
+            //Arrange
+            var fixture = new Fixture();
+            fixture.Customize(new AutoNSubstituteCustomization());
+            var consumer = Substitute.For<IConsumer>();
+            var messageId = MessageId.Latest;
+            var cts = new CancellationTokenSource();
+
+
+            var tracker = new UnackedMessageTracker(
+                TimeSpan.FromMilliseconds(10),
+                TimeSpan.FromMilliseconds(1));
+
+            //Act
+            tracker.Add(messageId);
+            cts.CancelAfter(20);
+            var _ = Task.Delay(5).ContinueWith(_ => tracker.Ack(messageId));
+            try { await tracker.Start(consumer, cts.Token); }
+            catch (TaskCanceledException) { }
+
+            //Assert
+            await consumer
+                .DidNotReceive()
+                .RedeliverUnacknowledgedMessages(
+                    Arg.Any<IEnumerable<MessageId>>(),
+                    Arg.Any<CancellationToken>());
+        }
+
+        [Fact]
+        public async void Start_GivenAMessageIdIsNotAckedWithinTimeout_ShouldRedeliver()
+        {
+            //Arrange
+            var fixture = new Fixture();
+            fixture.Customize(new AutoNSubstituteCustomization());
+            var consumer = Substitute.For<IConsumer>();
+            var messageId = MessageId.Latest;
+            var cts = new CancellationTokenSource();
+

Review comment:
       Done: ddfd374

##########
File path: tests/DotPulsar.Tests/Internal/MessageAcksTrackerTests.cs
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Tests.Internal
+{
+    using AutoFixture;
+    using AutoFixture.AutoNSubstitute;
+    using DotPulsar.Abstractions;
+    using DotPulsar.Internal;
+    using NSubstitute;
+    using System;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Linq.Expressions;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using Xunit;
+
+    public class UnackedMessageTrackerTests
+    {
+
+        [Fact]
+        public async void Start_GivenAMessageIdIsNotAcked_ShouldRedeliver()
+        {
+            //Arrange
+            var fixture = new Fixture();
+            fixture.Customize(new AutoNSubstituteCustomization());
+            var consumer = Substitute.For<IConsumer>();
+            var messageId = MessageId.Latest;
+            var cts = new CancellationTokenSource();
+
+
+            var tracker = new UnackedMessageTracker(
+                TimeSpan.FromMilliseconds(10),
+                TimeSpan.FromMilliseconds(1));
+
+            //Act
+            tracker.Add(messageId);
+            cts.CancelAfter(20);
+            try { await tracker.Start(consumer, cts.Token); }
+            catch (TaskCanceledException) { }
+
+            //Assert
+            await consumer
+                .Received(1)
+                .RedeliverUnacknowledgedMessages(
+                    Arg.Is(EquivalentTo(new List<MessageId>() { messageId })),
+                    Arg.Any<CancellationToken>());
+        }
+
+        [Fact]
+        public async void Start_GivenAMessageIdIsAckedWithinTimeout_ShouldNotRedeliver()
+        {
+            //Arrange
+            var fixture = new Fixture();
+            fixture.Customize(new AutoNSubstituteCustomization());
+            var consumer = Substitute.For<IConsumer>();
+            var messageId = MessageId.Latest;
+            var cts = new CancellationTokenSource();
+

Review comment:
       Done: ddfd374

##########
File path: tests/DotPulsar.Tests/Internal/MessageAcksTrackerTests.cs
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Tests.Internal
+{
+    using AutoFixture;
+    using AutoFixture.AutoNSubstitute;
+    using DotPulsar.Abstractions;
+    using DotPulsar.Internal;
+    using NSubstitute;
+    using System;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Linq.Expressions;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using Xunit;
+
+    public class UnackedMessageTrackerTests
+    {
+

Review comment:
       Done: ddfd374

##########
File path: src/DotPulsar/Internal/InactiveUnackedMessageTracker.cs
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using System.Threading;
+    using System.Threading.Tasks;
+
+    public class InactiveUnackedMessageTracker : IUnackedMessageTracker

Review comment:
       Done: ddfd374

##########
File path: tests/DotPulsar.Tests/Internal/MessageAcksTrackerTests.cs
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Tests.Internal
+{
+    using AutoFixture;
+    using AutoFixture.AutoNSubstitute;
+    using DotPulsar.Abstractions;
+    using DotPulsar.Internal;
+    using NSubstitute;
+    using System;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Linq.Expressions;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using Xunit;
+
+    public class UnackedMessageTrackerTests
+    {
+
+        [Fact]
+        public async void Start_GivenAMessageIdIsNotAcked_ShouldRedeliver()
+        {
+            //Arrange
+            var fixture = new Fixture();
+            fixture.Customize(new AutoNSubstituteCustomization());
+            var consumer = Substitute.For<IConsumer>();
+            var messageId = MessageId.Latest;
+            var cts = new CancellationTokenSource();
+

Review comment:
       Done: ddfd374




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] dionjansen commented on pull request #67: Support for (optional) ack timeout and nack delay for consumers

Posted by GitBox <gi...@apache.org>.
dionjansen commented on pull request #67:
URL: https://github.com/apache/pulsar-dotpulsar/pull/67#issuecomment-755502834


   @blankensteiner thanks for the comments I managed to fix most of it with a view side comments/ questions (see above). Let me know what you think 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] dionjansen commented on a change in pull request #67: Support for (optional) ack timeout and nack delay for consumers

Posted by GitBox <gi...@apache.org>.
dionjansen commented on a change in pull request #67:
URL: https://github.com/apache/pulsar-dotpulsar/pull/67#discussion_r552557276



##########
File path: src/DotPulsar/Internal/InactiveMessageAcksTracker.cs
##########
@@ -0,0 +1,30 @@
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using DotPulsar.Exceptions;
+    using Events;
+    using Microsoft.Extensions.ObjectPool;
+    using PulsarApi;
+    using System;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Runtime.CompilerServices;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using System.Diagnostics;
+
+    public sealed class InactiveMessageAcksTracker : IMessageAcksTracker<MessageId>
+    {
+        public InactiveMessageAcksTracker() { }

Review comment:
       Done https://github.com/apache/pulsar-dotpulsar/pull/67/commits/a7d5f02704d2ab598377009bf1115794c1a6db2b




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] dionjansen commented on a change in pull request #67: Support for (optional) ack timeout and nack delay for consumers

Posted by GitBox <gi...@apache.org>.
dionjansen commented on a change in pull request #67:
URL: https://github.com/apache/pulsar-dotpulsar/pull/67#discussion_r552556303



##########
File path: tests/DotPulsar.Tests/Internal/MessageAcksTrackerTests.cs
##########
@@ -0,0 +1,172 @@
+namespace DotPulsar.Tests.Internal

Review comment:
       Done 7ad89a2




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] blankensteiner commented on a change in pull request #67: Support for (optional) ack timeout and nack delay for consumers

Posted by GitBox <gi...@apache.org>.
blankensteiner commented on a change in pull request #67:
URL: https://github.com/apache/pulsar-dotpulsar/pull/67#discussion_r552465648



##########
File path: src/DotPulsar/ConsumerOptions.cs
##########
@@ -101,5 +101,16 @@ public ConsumerOptions(string subscriptionName, string topic)
         /// Set the topic for this consumer. This is required.
         /// </summary>
         public string Topic { get; set; }
+
+        /// <summary>
+        /// Delay to wait before redelivering messages that failed to be processed.
+        /// When an application uses IConsumer.NegativeAcknowledge(Message), failed messages are redelivered after a fixed timeout.
+        /// </summary>
+        public int NegativeAckRedeliveryDelayMicros { get; set; }

Review comment:
       Let's make this a TimeSpan and call it "NegativeAcknowledgementRedeliveryDelay"

##########
File path: src/DotPulsar/Internal/Abstractions/IMessageAcksTracker.cs
##########
@@ -0,0 +1,13 @@
+namespace DotPulsar.Internal.Abstractions

Review comment:
       We need the apache header at the top of every file

##########
File path: src/DotPulsar/ConsumerOptions.cs
##########
@@ -101,5 +101,16 @@ public ConsumerOptions(string subscriptionName, string topic)
         /// Set the topic for this consumer. This is required.
         /// </summary>
         public string Topic { get; set; }
+
+        /// <summary>
+        /// Delay to wait before redelivering messages that failed to be processed.
+        /// When an application uses IConsumer.NegativeAcknowledge(Message), failed messages are redelivered after a fixed timeout.
+        /// </summary>
+        public int NegativeAckRedeliveryDelayMicros { get; set; }
+
+        /// <summary>
+        /// Timeout of unacked messages
+        /// </summary>
+        public int AckTimeoutMillis { get; set; }

Review comment:
       Let's make this a TimeSpan and call it "AcknowledgementTimeout"

##########
File path: src/DotPulsar/Internal/Abstractions/IMessageAcksTracker.cs
##########
@@ -0,0 +1,13 @@
+namespace DotPulsar.Internal.Abstractions
+{
+    using DotPulsar.Abstractions;
+    using System.Threading;
+    using System.Threading.Tasks;
+    public interface IMessageAcksTracker<T>

Review comment:
       Let's have a single space between using and class declarations.
   I'm not sure this interface is needed and why Add, Ack and Nack return T?

##########
File path: src/DotPulsar/Internal/Abstractions/IMessageAcksTracker.cs
##########
@@ -0,0 +1,13 @@
+namespace DotPulsar.Internal.Abstractions
+{
+    using DotPulsar.Abstractions;
+    using System.Threading;
+    using System.Threading.Tasks;
+    public interface IMessageAcksTracker<T>
+    {
+        T Add(T message);
+        T Ack(T message);
+        T Nack(T message);
+        Task StartTracker(IConsumer consumer, CancellationToken cancellationToken);
+    }
+}

Review comment:
       Needs a new line

##########
File path: src/DotPulsar/Internal/Abstractions/IMessageQueue.cs
##########
@@ -0,0 +1,10 @@
+namespace DotPulsar.Internal.Abstractions

Review comment:
       We need the apache header at the top of every file

##########
File path: src/DotPulsar/Internal/Abstractions/IUnackedMessageTracker.cs
##########
@@ -0,0 +1,16 @@
+namespace DotPulsar.Internal.Abstractions

Review comment:
       We need the apache header at the top of every file

##########
File path: src/DotPulsar/Internal/InactiveMessageAcksTracker.cs
##########
@@ -0,0 +1,30 @@
+namespace DotPulsar.Internal

Review comment:
       We need the apache header at the top of every file

##########
File path: src/DotPulsar/Internal/InactiveMessageAcksTracker.cs
##########
@@ -0,0 +1,30 @@
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using DotPulsar.Exceptions;
+    using Events;
+    using Microsoft.Extensions.ObjectPool;
+    using PulsarApi;
+    using System;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Runtime.CompilerServices;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using System.Diagnostics;
+
+    public sealed class InactiveMessageAcksTracker : IMessageAcksTracker<MessageId>
+    {
+        public InactiveMessageAcksTracker() { }

Review comment:
       Empty constructors can be deleted

##########
File path: src/DotPulsar/Internal/Abstractions/IMessageQueue.cs
##########
@@ -0,0 +1,10 @@
+namespace DotPulsar.Internal.Abstractions
+{
+    using System;
+
+    public interface IMessageQueue : IDequeue<MessagePackage>, IDisposable
+    {
+        void Acknowledge(MessageId obj);
+        void NegativeAcknowledge(MessageId obj);
+    }
+}

Review comment:
       Needs a new line

##########
File path: src/DotPulsar/Internal/InactiveUnackedMessageTracker.cs
##########
@@ -0,0 +1,31 @@
+namespace DotPulsar.Internal

Review comment:
       We need the apache header at the top of every file

##########
File path: src/DotPulsar/Internal/ConsumerChannel.cs
##########
@@ -108,6 +108,9 @@ public async Task Send(CommandAck command, CancellationToken cancellationToken)
             }
 
             command.ConsumerId = _id;
+
+            _queue.Acknowledge(new MessageId(messageId));

Review comment:
       Some operations need to be as fast as possible. For consumers, this is receiving and acknowledging messages. Let's work with "messageId" instead of having to create a MessageId.

##########
File path: src/DotPulsar/Internal/Abstractions/IMessageQueue.cs
##########
@@ -0,0 +1,10 @@
+namespace DotPulsar.Internal.Abstractions
+{
+    using System;
+
+    public interface IMessageQueue : IDequeue<MessagePackage>, IDisposable
+    {
+        void Acknowledge(MessageId obj);

Review comment:
       Let's call them "messageId" instead of "obj".

##########
File path: src/DotPulsar/Internal/MessageAcksTracker.cs
##########
@@ -0,0 +1,103 @@
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using DotPulsar.Exceptions;
+    using Events;
+    using Microsoft.Extensions.ObjectPool;
+    using PulsarApi;
+    using System;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Runtime.CompilerServices;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using System.Diagnostics;
+
+    internal class Tracker
+    {
+        private readonly Stopwatch _timer;
+        private int maxTimeoutMs;
+
+        public Tracker(int timeoutMs)
+        {
+            maxTimeoutMs = timeoutMs;
+            _timer = new Stopwatch();
+            _timer.Start();
+        }
+
+        public bool IsTimedOut() => _timer.ElapsedMilliseconds > maxTimeoutMs;
+
+        public long msTillTimeout => maxTimeoutMs - _timer.ElapsedMilliseconds;
+
+        public void Reset(int newTimeoutMs)
+        {
+            maxTimeoutMs = newTimeoutMs;
+            _timer.Restart();
+        }
+    }
+
+    // TODO add mechnism to stop tracker when disposed
+    public sealed class MessageAcksTracker : IMessageAcksTracker<MessageId>

Review comment:
       Every class should have its own file

##########
File path: src/DotPulsar/Internal/MessageAcksTracker.cs
##########
@@ -0,0 +1,103 @@
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using DotPulsar.Exceptions;
+    using Events;
+    using Microsoft.Extensions.ObjectPool;
+    using PulsarApi;
+    using System;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Runtime.CompilerServices;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using System.Diagnostics;
+
+    internal class Tracker
+    {
+        private readonly Stopwatch _timer;
+        private int maxTimeoutMs;

Review comment:
       Every field to start with an underscore.

##########
File path: src/DotPulsar/Internal/MessageQueue.cs
##########
@@ -0,0 +1,45 @@
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using DotPulsar.Exceptions;
+    using Events;
+    using Microsoft.Extensions.ObjectPool;
+    using PulsarApi;
+    using System;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Runtime.CompilerServices;
+    using System.Threading;
+    using System.Threading.Tasks;
+
+    public sealed class MessageQueue : IMessageQueue, IDequeue<MessagePackage>, IDisposable
+    {
+        private readonly AsyncQueue<MessagePackage> _queue;
+        private readonly IUnackedMessageTracker _tracker;
+        public MessageQueue(AsyncQueue<MessagePackage> queue, IUnackedMessageTracker tracker)
+        {
+            _queue = queue;
+            _tracker = tracker;
+        }
+        public async ValueTask<MessagePackage> Dequeue(CancellationToken cancellationToken = default)
+        {
+            var message = await _queue.Dequeue(cancellationToken).ConfigureAwait(false);
+            _tracker.Add(new MessageId(message.MessageId));
+            return message;
+        }
+        public void Acknowledge(MessageId obj) => _tracker.Ack(obj);
+
+        public void NegativeAcknowledge(MessageId obj)
+        {
+            throw new NotImplementedException();
+        }
+
+        public void Dispose()
+        {
+            _queue.Dispose();
+            _tracker.Dispose();
+        }
+        
+    }
+}

Review comment:
       Needs a new line

##########
File path: src/DotPulsar/Internal/MessageAcksTracker.cs
##########
@@ -0,0 +1,103 @@
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using DotPulsar.Exceptions;
+    using Events;
+    using Microsoft.Extensions.ObjectPool;
+    using PulsarApi;
+    using System;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Runtime.CompilerServices;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using System.Diagnostics;
+
+    internal class Tracker
+    {
+        private readonly Stopwatch _timer;
+        private int maxTimeoutMs;
+
+        public Tracker(int timeoutMs)
+        {
+            maxTimeoutMs = timeoutMs;
+            _timer = new Stopwatch();
+            _timer.Start();
+        }
+
+        public bool IsTimedOut() => _timer.ElapsedMilliseconds > maxTimeoutMs;
+
+        public long msTillTimeout => maxTimeoutMs - _timer.ElapsedMilliseconds;
+
+        public void Reset(int newTimeoutMs)
+        {
+            maxTimeoutMs = newTimeoutMs;
+            _timer.Restart();
+        }
+    }
+
+    // TODO add mechnism to stop tracker when disposed
+    public sealed class MessageAcksTracker : IMessageAcksTracker<MessageId>
+    {
+        private readonly Dictionary<MessageId, Tracker> _trackers;
+        private readonly int _unackedTimeoutMs;
+        private readonly int _nackTimeoutMs;
+        private readonly int _trackerDelayMs;
+        public MessageAcksTracker(int unackedTimeoutMs, int nackTimeoutMs, int trackerDelayMs)
+        {
+            _unackedTimeoutMs = unackedTimeoutMs;
+            _nackTimeoutMs = nackTimeoutMs;
+            _trackerDelayMs = trackerDelayMs;
+            _trackers = new Dictionary<MessageId, Tracker>();
+        }
+
+        public async Task StartTracker(IConsumer consumer, CancellationToken cancellationToken)
+        {
+            await Task.Yield();
+
+            while (true)
+            {
+                await Task.Delay(_trackerDelayMs);
+
+                var messageIds = new List<MessageId>();
+                foreach (KeyValuePair<MessageId, Tracker> p in _trackers)
+                {
+                    if (p.Value.IsTimedOut())
+                        messageIds.Add(p.Key);
+                }
+
+                if (messageIds.Count() > 0)
+                    await consumer.RedeliverUnacknowledgedMessages(messageIds, cancellationToken).ConfigureAwait(false);
+
+            }
+        }
+        public MessageId Add(MessageId message)
+        {
+            if (!_trackers.ContainsKey(message))
+            {
+                _trackers.Add(message, new Tracker(_unackedTimeoutMs));
+            }
+
+            return message;
+        }
+        public MessageId Ack(MessageId message)
+        {
+            if (_trackers.ContainsKey(message))
+                _trackers.Remove(message);
+            return message;
+        }
+        public MessageId Nack(MessageId message)
+        {
+            if (_trackers.ContainsKey(message))
+            {
+                var timer = _trackers[message];
+                if (timer.msTillTimeout > _nackTimeoutMs)
+                    timer.Reset(_nackTimeoutMs);
+            }
+            else
+                _trackers.Add(message, new Tracker(_nackTimeoutMs));
+            return message;
+        }
+    }
+}

Review comment:
       Needs a new line

##########
File path: src/DotPulsar/Internal/MessageQueue.cs
##########
@@ -0,0 +1,45 @@
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using DotPulsar.Exceptions;
+    using Events;
+    using Microsoft.Extensions.ObjectPool;
+    using PulsarApi;
+    using System;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Runtime.CompilerServices;
+    using System.Threading;
+    using System.Threading.Tasks;
+
+    public sealed class MessageQueue : IMessageQueue, IDequeue<MessagePackage>, IDisposable
+    {
+        private readonly AsyncQueue<MessagePackage> _queue;
+        private readonly IUnackedMessageTracker _tracker;
+        public MessageQueue(AsyncQueue<MessagePackage> queue, IUnackedMessageTracker tracker)
+        {
+            _queue = queue;
+            _tracker = tracker;
+        }
+        public async ValueTask<MessagePackage> Dequeue(CancellationToken cancellationToken = default)
+        {
+            var message = await _queue.Dequeue(cancellationToken).ConfigureAwait(false);
+            _tracker.Add(new MessageId(message.MessageId));
+            return message;
+        }
+        public void Acknowledge(MessageId obj) => _tracker.Ack(obj);

Review comment:
       Needs spacing

##########
File path: src/DotPulsar/Internal/MessageAcksTracker.cs
##########
@@ -0,0 +1,103 @@
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using DotPulsar.Exceptions;
+    using Events;
+    using Microsoft.Extensions.ObjectPool;
+    using PulsarApi;
+    using System;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Runtime.CompilerServices;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using System.Diagnostics;
+
+    internal class Tracker
+    {
+        private readonly Stopwatch _timer;
+        private int maxTimeoutMs;
+
+        public Tracker(int timeoutMs)
+        {
+            maxTimeoutMs = timeoutMs;
+            _timer = new Stopwatch();
+            _timer.Start();
+        }
+
+        public bool IsTimedOut() => _timer.ElapsedMilliseconds > maxTimeoutMs;
+
+        public long msTillTimeout => maxTimeoutMs - _timer.ElapsedMilliseconds;
+
+        public void Reset(int newTimeoutMs)
+        {
+            maxTimeoutMs = newTimeoutMs;
+            _timer.Restart();
+        }
+    }
+
+    // TODO add mechnism to stop tracker when disposed
+    public sealed class MessageAcksTracker : IMessageAcksTracker<MessageId>
+    {
+        private readonly Dictionary<MessageId, Tracker> _trackers;
+        private readonly int _unackedTimeoutMs;
+        private readonly int _nackTimeoutMs;
+        private readonly int _trackerDelayMs;
+        public MessageAcksTracker(int unackedTimeoutMs, int nackTimeoutMs, int trackerDelayMs)
+        {
+            _unackedTimeoutMs = unackedTimeoutMs;
+            _nackTimeoutMs = nackTimeoutMs;
+            _trackerDelayMs = trackerDelayMs;
+            _trackers = new Dictionary<MessageId, Tracker>();
+        }
+
+        public async Task StartTracker(IConsumer consumer, CancellationToken cancellationToken)
+        {
+            await Task.Yield();
+
+            while (true)
+            {
+                await Task.Delay(_trackerDelayMs);
+
+                var messageIds = new List<MessageId>();
+                foreach (KeyValuePair<MessageId, Tracker> p in _trackers)
+                {
+                    if (p.Value.IsTimedOut())
+                        messageIds.Add(p.Key);
+                }
+
+                if (messageIds.Count() > 0)
+                    await consumer.RedeliverUnacknowledgedMessages(messageIds, cancellationToken).ConfigureAwait(false);
+
+            }
+        }
+        public MessageId Add(MessageId message)
+        {
+            if (!_trackers.ContainsKey(message))
+            {

Review comment:
       No need to create brackets for a single line if-statements

##########
File path: src/DotPulsar/Internal/UnackedMessageTracker.cs
##########
@@ -0,0 +1,101 @@
+namespace DotPulsar.Internal

Review comment:
       We need the apache header at the top of every file

##########
File path: tests/DotPulsar.Tests/Internal/MessageAcksTrackerTests.cs
##########
@@ -0,0 +1,172 @@
+namespace DotPulsar.Tests.Internal
+{
+    using DotPulsar.Internal;
+    using DotPulsar.Abstractions;
+    using FluentAssertions;
+    using Xunit;
+    using System;
+    using AutoFixture;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Linq.Expressions;
+    using AutoFixture.AutoNSubstitute;
+    using NSubstitute;
+    using System.Diagnostics;
+
+    public class UnackedMessageTrackerTests
+    {
+        [Fact]
+        public void Test_Instance()

Review comment:
       I'm not sure this test makes much sense. If you call a constructor and it doesn't throw an exception, then the type returned is always what you expect.

##########
File path: src/DotPulsar/Internal/InactiveMessageAcksTracker.cs
##########
@@ -0,0 +1,30 @@
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using DotPulsar.Exceptions;
+    using Events;
+    using Microsoft.Extensions.ObjectPool;
+    using PulsarApi;
+    using System;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Runtime.CompilerServices;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using System.Diagnostics;
+
+    public sealed class InactiveMessageAcksTracker : IMessageAcksTracker<MessageId>
+    {
+        public InactiveMessageAcksTracker() { }
+
+        public async Task StartTracker(IConsumer consumer, CancellationToken cancellationToken)
+        {
+            await Task.Yield();
+        }
+
+        public MessageId Add(MessageId message) => message;
+        public MessageId Ack(MessageId message) => message;
+        public MessageId Nack(MessageId message) => message;
+    }
+}

Review comment:
       Needs a new line

##########
File path: src/DotPulsar/Internal/MessageAcksTracker.cs
##########
@@ -0,0 +1,103 @@
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using DotPulsar.Exceptions;
+    using Events;
+    using Microsoft.Extensions.ObjectPool;
+    using PulsarApi;
+    using System;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Runtime.CompilerServices;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using System.Diagnostics;
+
+    internal class Tracker
+    {
+        private readonly Stopwatch _timer;
+        private int maxTimeoutMs;
+
+        public Tracker(int timeoutMs)
+        {
+            maxTimeoutMs = timeoutMs;
+            _timer = new Stopwatch();

Review comment:
       A stopwatch can be created and started in one go: StopWatch.StartNew();

##########
File path: src/DotPulsar/Internal/InactiveUnackedMessageTracker.cs
##########
@@ -0,0 +1,31 @@
+namespace DotPulsar.Internal
+{
+    using System.Threading;

Review comment:
       Usings should be sorted alphabetically. If you run a code cleanup, this should be handled for you automatically.

##########
File path: src/DotPulsar/Internal/MessageQueue.cs
##########
@@ -0,0 +1,45 @@
+namespace DotPulsar.Internal

Review comment:
       We need the apache header at the top of every file

##########
File path: src/DotPulsar/Internal/InactiveUnackedMessageTracker.cs
##########
@@ -0,0 +1,31 @@
+namespace DotPulsar.Internal
+{
+    using System.Threading;
+    using System.Threading.Tasks;
+    using Abstractions;
+    using DotPulsar.Abstractions;
+
+    public class InactiveUnackedMessageTracker : IUnackedMessageTracker
+    {
+        public InactiveUnackedMessageTracker()
+        {
+        }
+
+        public void Ack(MessageId messageId)
+        {
+            return;
+        }
+
+        public void Add(MessageId messageId)
+        {
+            return;
+        }
+
+        public Task Start(IConsumer consumer, CancellationToken cancellationToken = default) => Task.CompletedTask;
+
+        public void Dispose() {
+            return;
+        }
+

Review comment:
       Unnecessary spacing

##########
File path: src/DotPulsar/Internal/MessageAcksTracker.cs
##########
@@ -0,0 +1,103 @@
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using DotPulsar.Exceptions;
+    using Events;
+    using Microsoft.Extensions.ObjectPool;
+    using PulsarApi;
+    using System;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Runtime.CompilerServices;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using System.Diagnostics;
+
+    internal class Tracker

Review comment:
       Everything in the "Internal" namespace is internal, so no need to mark it as internal.

##########
File path: src/DotPulsar/Internal/MessageQueue.cs
##########
@@ -0,0 +1,45 @@
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using DotPulsar.Exceptions;
+    using Events;
+    using Microsoft.Extensions.ObjectPool;
+    using PulsarApi;
+    using System;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Runtime.CompilerServices;
+    using System.Threading;
+    using System.Threading.Tasks;
+
+    public sealed class MessageQueue : IMessageQueue, IDequeue<MessagePackage>, IDisposable
+    {
+        private readonly AsyncQueue<MessagePackage> _queue;
+        private readonly IUnackedMessageTracker _tracker;
+        public MessageQueue(AsyncQueue<MessagePackage> queue, IUnackedMessageTracker tracker)

Review comment:
       Needs spacing

##########
File path: src/DotPulsar/Internal/MessageAcksTracker.cs
##########
@@ -0,0 +1,103 @@
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using DotPulsar.Exceptions;
+    using Events;
+    using Microsoft.Extensions.ObjectPool;
+    using PulsarApi;
+    using System;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Runtime.CompilerServices;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using System.Diagnostics;
+
+    internal class Tracker
+    {
+        private readonly Stopwatch _timer;
+        private int maxTimeoutMs;
+
+        public Tracker(int timeoutMs)
+        {
+            maxTimeoutMs = timeoutMs;
+            _timer = new Stopwatch();
+            _timer.Start();
+        }
+
+        public bool IsTimedOut() => _timer.ElapsedMilliseconds > maxTimeoutMs;
+
+        public long msTillTimeout => maxTimeoutMs - _timer.ElapsedMilliseconds;
+
+        public void Reset(int newTimeoutMs)
+        {
+            maxTimeoutMs = newTimeoutMs;
+            _timer.Restart();
+        }
+    }
+
+    // TODO add mechnism to stop tracker when disposed
+    public sealed class MessageAcksTracker : IMessageAcksTracker<MessageId>
+    {
+        private readonly Dictionary<MessageId, Tracker> _trackers;
+        private readonly int _unackedTimeoutMs;
+        private readonly int _nackTimeoutMs;
+        private readonly int _trackerDelayMs;
+        public MessageAcksTracker(int unackedTimeoutMs, int nackTimeoutMs, int trackerDelayMs)
+        {
+            _unackedTimeoutMs = unackedTimeoutMs;
+            _nackTimeoutMs = nackTimeoutMs;
+            _trackerDelayMs = trackerDelayMs;
+            _trackers = new Dictionary<MessageId, Tracker>();
+        }
+
+        public async Task StartTracker(IConsumer consumer, CancellationToken cancellationToken)
+        {
+            await Task.Yield();
+
+            while (true)
+            {
+                await Task.Delay(_trackerDelayMs);
+
+                var messageIds = new List<MessageId>();
+                foreach (KeyValuePair<MessageId, Tracker> p in _trackers)
+                {
+                    if (p.Value.IsTimedOut())
+                        messageIds.Add(p.Key);
+                }
+
+                if (messageIds.Count() > 0)
+                    await consumer.RedeliverUnacknowledgedMessages(messageIds, cancellationToken).ConfigureAwait(false);
+
+            }
+        }
+        public MessageId Add(MessageId message)
+        {
+            if (!_trackers.ContainsKey(message))
+            {
+                _trackers.Add(message, new Tracker(_unackedTimeoutMs));
+            }
+
+            return message;
+        }
+        public MessageId Ack(MessageId message)

Review comment:
       Let's have a single space between methods.

##########
File path: tests/DotPulsar.StressTests/DotPulsar.StressTests.csproj
##########
@@ -7,6 +7,7 @@
 
   <ItemGroup>
     <PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.8.3" />
+    <PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="5.0.0" />

Review comment:
       I think this is some Visual Code leftover?

##########
File path: src/DotPulsar/Internal/MessageAcksTracker.cs
##########
@@ -0,0 +1,103 @@
+namespace DotPulsar.Internal

Review comment:
       We need the apache header at the top of every file

##########
File path: src/DotPulsar/Internal/MessageAcksTracker.cs
##########
@@ -0,0 +1,103 @@
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using DotPulsar.Exceptions;
+    using Events;
+    using Microsoft.Extensions.ObjectPool;
+    using PulsarApi;
+    using System;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Runtime.CompilerServices;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using System.Diagnostics;
+
+    internal class Tracker
+    {
+        private readonly Stopwatch _timer;
+        private int maxTimeoutMs;
+
+        public Tracker(int timeoutMs)
+        {
+            maxTimeoutMs = timeoutMs;
+            _timer = new Stopwatch();
+            _timer.Start();
+        }
+
+        public bool IsTimedOut() => _timer.ElapsedMilliseconds > maxTimeoutMs;
+
+        public long msTillTimeout => maxTimeoutMs - _timer.ElapsedMilliseconds;
+
+        public void Reset(int newTimeoutMs)
+        {
+            maxTimeoutMs = newTimeoutMs;
+            _timer.Restart();
+        }
+    }
+
+    // TODO add mechnism to stop tracker when disposed
+    public sealed class MessageAcksTracker : IMessageAcksTracker<MessageId>
+    {
+        private readonly Dictionary<MessageId, Tracker> _trackers;
+        private readonly int _unackedTimeoutMs;
+        private readonly int _nackTimeoutMs;
+        private readonly int _trackerDelayMs;
+        public MessageAcksTracker(int unackedTimeoutMs, int nackTimeoutMs, int trackerDelayMs)

Review comment:
       Let's have a single space between the declaration of fields and constructors

##########
File path: src/DotPulsar/Internal/MessageQueue.cs
##########
@@ -0,0 +1,45 @@
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using DotPulsar.Exceptions;
+    using Events;
+    using Microsoft.Extensions.ObjectPool;
+    using PulsarApi;
+    using System;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Runtime.CompilerServices;
+    using System.Threading;
+    using System.Threading.Tasks;
+
+    public sealed class MessageQueue : IMessageQueue, IDequeue<MessagePackage>, IDisposable
+    {
+        private readonly AsyncQueue<MessagePackage> _queue;
+        private readonly IUnackedMessageTracker _tracker;
+        public MessageQueue(AsyncQueue<MessagePackage> queue, IUnackedMessageTracker tracker)
+        {
+            _queue = queue;
+            _tracker = tracker;
+        }
+        public async ValueTask<MessagePackage> Dequeue(CancellationToken cancellationToken = default)
+        {
+            var message = await _queue.Dequeue(cancellationToken).ConfigureAwait(false);
+            _tracker.Add(new MessageId(message.MessageId));
+            return message;
+        }
+        public void Acknowledge(MessageId obj) => _tracker.Ack(obj);
+
+        public void NegativeAcknowledge(MessageId obj)
+        {
+            throw new NotImplementedException();
+        }
+
+        public void Dispose()
+        {
+            _queue.Dispose();
+            _tracker.Dispose();
+        }
+        

Review comment:
       Unnecessary spacing

##########
File path: src/DotPulsar/Internal/UnackedMessageTracker.cs
##########
@@ -0,0 +1,101 @@
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using System;
+    using System.Collections.Concurrent;
+    using System.Diagnostics;
+    using System.Linq;
+    using System.Collections.Generic;
+    using System.Threading;
+    using System.Threading.Tasks;
+
+    public readonly struct AwaitingAck
+    {
+        public MessageId MessageId { get; }
+        public long Timestamp { get; }
+
+        public AwaitingAck(MessageId messageId)
+        {
+            MessageId = messageId;
+            Timestamp = Stopwatch.GetTimestamp();
+        }
+
+        public TimeSpan Elapsed => TimeSpan.FromTicks(
+            (long) ((Stopwatch.GetTimestamp() - Timestamp) / (double)Stopwatch.Frequency * TimeSpan.TicksPerSecond));
+    }
+
+    public sealed class UnackedMessageTracker : IUnackedMessageTracker
+    {
+        private readonly TimeSpan _ackTimeout;
+        private readonly TimeSpan _pollingTimeout;
+        private readonly ConcurrentQueue<AwaitingAck> _awaitingAcks;
+        private readonly List<MessageId> _acked;
+        private readonly CancellationTokenSource _cancellationTokenSource;
+
+        
+        public UnackedMessageTracker(TimeSpan ackTimeout, TimeSpan pollingTimeout)
+        {
+            _ackTimeout = ackTimeout;
+            _pollingTimeout = pollingTimeout;
+            _awaitingAcks = new ConcurrentQueue<AwaitingAck>();
+            _acked = new List<MessageId>();
+            _cancellationTokenSource = new CancellationTokenSource();
+        }
+
+        public void Add(MessageId messageId)
+        {
+            _awaitingAcks.Enqueue(new AwaitingAck(messageId));
+        }
+
+        public void Ack(MessageId messageId)
+        {
+            // We only need to store the highest cumulative ack we see (if there is one)
+            // and the MessageIds not included by that cumulative ack.
+            _acked.Add(messageId);
+        }
+
+        public Task Start(IConsumer consumer, CancellationToken cancellationToken = default)
+        {
+            CancellationToken token =
+              CancellationTokenSource.CreateLinkedTokenSource(
+                  _cancellationTokenSource.Token, cancellationToken).Token;
+
+            return Task.Run(async () => {

Review comment:
       Starting a task here does really make much sense. If you await "Start" when the result is the same no matter if you create this task or not.

##########
File path: tests/DotPulsar.Tests/Internal/MessageAcksTrackerTests.cs
##########
@@ -0,0 +1,172 @@
+namespace DotPulsar.Tests.Internal
+{
+    using DotPulsar.Internal;
+    using DotPulsar.Abstractions;
+    using FluentAssertions;
+    using Xunit;
+    using System;
+    using AutoFixture;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Linq.Expressions;
+    using AutoFixture.AutoNSubstitute;
+    using NSubstitute;
+    using System.Diagnostics;
+
+    public class UnackedMessageTrackerTests
+    {
+        [Fact]
+        public void Test_Instance()
+        {
+            var tracker = new UnackedMessageTracker(TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(1));
+            tracker.Should().BeOfType<UnackedMessageTracker>();
+        }
+
+
+        [Fact]
+        public async void Test_AwaitingAck_Elapsed()
+        {
+            //Arrange
+            var messageId = MessageId.Latest;
+            var sw = new Stopwatch();
+            sw.Start();
+
+            //Act
+            var awaiting = new AwaitingAck(messageId);
+            await Task.Delay(TimeSpan.FromMilliseconds(123));
+            sw.Stop();
+
+            //Assert
+            awaiting.Elapsed.Should().BeCloseTo(sw.Elapsed, 1);
+        }
+
+        [Fact]
+        public async void Test_Start_Message()
+        {
+            //Arrange
+            var fixture = new Fixture();
+            fixture.Customize(new AutoNSubstituteCustomization());
+            var consumer = Substitute.For<IConsumer>();
+            var messageId = MessageId.Latest;
+            var cts = new CancellationTokenSource();
+
+
+            var tracker = new UnackedMessageTracker(
+                TimeSpan.FromMilliseconds(10),
+                TimeSpan.FromMilliseconds(1));
+
+            //Act
+            tracker.Add(messageId);
+            cts.CancelAfter(20);
+            try { await tracker.Start(consumer, cts.Token); }
+            catch (TaskCanceledException) { }
+
+            //Assert
+            await consumer
+                .Received(1)
+                .RedeliverUnacknowledgedMessages(
+                    Arg.Is(EquivalentTo(new List<MessageId>() { messageId })),
+                    Arg.Any<CancellationToken>());
+        }
+
+        [Fact]
+        public async void Test_Start_Message_Ack_In_Time()
+        {
+            //Arrange
+            var fixture = new Fixture();
+            fixture.Customize(new AutoNSubstituteCustomization());
+            var consumer = Substitute.For<IConsumer>();
+            var messageId = MessageId.Latest;
+            var cts = new CancellationTokenSource();
+
+
+            var tracker = new UnackedMessageTracker(
+                TimeSpan.FromMilliseconds(10),
+                TimeSpan.FromMilliseconds(1));
+
+            //Act
+            tracker.Add(messageId);
+            cts.CancelAfter(20);
+            var _ = Task.Delay(5).ContinueWith(_ => tracker.Ack(messageId));
+            try { await tracker.Start(consumer, cts.Token); }
+            catch (TaskCanceledException) { }
+
+            //Assert
+            await consumer
+                .DidNotReceive()
+                .RedeliverUnacknowledgedMessages(
+                    Arg.Any<IEnumerable<MessageId>>(),
+                    Arg.Any<CancellationToken>());
+        }
+
+        [Fact]
+        public async void Test_Start_Message_Ack_Too_Late()
+        {
+            //Arrange
+            var fixture = new Fixture();
+            fixture.Customize(new AutoNSubstituteCustomization());
+            var consumer = Substitute.For<IConsumer>();
+            var messageId = MessageId.Latest;
+            var cts = new CancellationTokenSource();
+
+
+            var tracker = new UnackedMessageTracker(
+                TimeSpan.FromMilliseconds(10),
+                TimeSpan.FromMilliseconds(1));
+
+            //Act
+            tracker.Add(messageId);
+            cts.CancelAfter(20);
+
+            var _ = Task.Delay(15).ContinueWith(_ => tracker.Ack(messageId));
+            try { await tracker.Start(consumer, cts.Token); }
+            catch (TaskCanceledException) { }
+
+            //Assert
+            await consumer
+                .Received(1)
+                .RedeliverUnacknowledgedMessages(
+                    Arg.Any<IEnumerable<MessageId>>(),
+                    Arg.Any<CancellationToken>());
+        }
+
+        [Fact]
+        public async void Test_Start_Redeliver_Only_Cnce()
+        {
+            //Arrange
+            var fixture = new Fixture();
+            fixture.Customize(new AutoNSubstituteCustomization());
+            var consumer = Substitute.For<IConsumer>();
+            var messageId = MessageId.Latest;
+            var cts = new CancellationTokenSource();
+
+
+            var tracker = new UnackedMessageTracker(
+                TimeSpan.FromMilliseconds(10),
+                TimeSpan.FromMilliseconds(5));
+
+            //Act
+            tracker.Add(messageId);
+            cts.CancelAfter(50);
+            try { await tracker.Start(consumer, cts.Token); }
+            catch (TaskCanceledException) { }
+
+            //Assert
+            await consumer
+                .Received(1)
+                .RedeliverUnacknowledgedMessages(
+                    Arg.Any<IEnumerable<MessageId>>(),
+                    Arg.Any<CancellationToken>());
+        }
+
+
+        private Expression<Predicate<IEnumerable<T>>> EquivalentTo<T>(IEnumerable<T> enumerable) =>
+            x => IsEquivalentIEnumerable(enumerable, x);
+
+
+        private bool IsEquivalentIEnumerable<T>(IEnumerable<T> a, IEnumerable<T> b) =>
+            a.Count() == b.Count() && a.Zip(b, (a_, b_) => a_.Equals(b_)).All(_ => _);
+    }
+}

Review comment:
       Needs a new line

##########
File path: src/DotPulsar/Internal/UnackedMessageTracker.cs
##########
@@ -0,0 +1,101 @@
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using System;
+    using System.Collections.Concurrent;
+    using System.Diagnostics;
+    using System.Linq;
+    using System.Collections.Generic;
+    using System.Threading;
+    using System.Threading.Tasks;
+
+    public readonly struct AwaitingAck
+    {
+        public MessageId MessageId { get; }
+        public long Timestamp { get; }
+
+        public AwaitingAck(MessageId messageId)
+        {
+            MessageId = messageId;
+            Timestamp = Stopwatch.GetTimestamp();
+        }
+
+        public TimeSpan Elapsed => TimeSpan.FromTicks(
+            (long) ((Stopwatch.GetTimestamp() - Timestamp) / (double)Stopwatch.Frequency * TimeSpan.TicksPerSecond));
+    }
+
+    public sealed class UnackedMessageTracker : IUnackedMessageTracker
+    {
+        private readonly TimeSpan _ackTimeout;
+        private readonly TimeSpan _pollingTimeout;
+        private readonly ConcurrentQueue<AwaitingAck> _awaitingAcks;
+        private readonly List<MessageId> _acked;
+        private readonly CancellationTokenSource _cancellationTokenSource;
+

Review comment:
       Unnecessary double spacing, just need a single space.

##########
File path: tests/DotPulsar.Tests/Internal/MessageAcksTrackerTests.cs
##########
@@ -0,0 +1,172 @@
+namespace DotPulsar.Tests.Internal

Review comment:
       We need the apache header at the top of every file

##########
File path: tests/DotPulsar.Tests/Internal/MessageAcksTrackerTests.cs
##########
@@ -0,0 +1,172 @@
+namespace DotPulsar.Tests.Internal
+{
+    using DotPulsar.Internal;
+    using DotPulsar.Abstractions;
+    using FluentAssertions;
+    using Xunit;
+    using System;
+    using AutoFixture;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Linq.Expressions;
+    using AutoFixture.AutoNSubstitute;
+    using NSubstitute;
+    using System.Diagnostics;
+
+    public class UnackedMessageTrackerTests
+    {
+        [Fact]
+        public void Test_Instance()
+        {
+            var tracker = new UnackedMessageTracker(TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(1));
+            tracker.Should().BeOfType<UnackedMessageTracker>();
+        }
+
+
+        [Fact]
+        public async void Test_AwaitingAck_Elapsed()
+        {
+            //Arrange
+            var messageId = MessageId.Latest;
+            var sw = new Stopwatch();
+            sw.Start();
+
+            //Act
+            var awaiting = new AwaitingAck(messageId);
+            await Task.Delay(TimeSpan.FromMilliseconds(123));
+            sw.Stop();
+
+            //Assert
+            awaiting.Elapsed.Should().BeCloseTo(sw.Elapsed, 1);
+        }
+
+        [Fact]
+        public async void Test_Start_Message()

Review comment:
       The naming convention for tests is Method/Feature_[Given]State_[Should]ExpectedResult.
   E.g From AsyncLockTests:
   Dispose_GivenLockIsDisposedWhileItIsTaken_ShouldNotCompleteBeforeItIsReleased()

##########
File path: tests/DotPulsar.Tests/Internal/MessageAcksTrackerTests.cs
##########
@@ -0,0 +1,172 @@
+namespace DotPulsar.Tests.Internal
+{
+    using DotPulsar.Internal;
+    using DotPulsar.Abstractions;
+    using FluentAssertions;
+    using Xunit;
+    using System;
+    using AutoFixture;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Linq.Expressions;
+    using AutoFixture.AutoNSubstitute;
+    using NSubstitute;
+    using System.Diagnostics;
+
+    public class UnackedMessageTrackerTests
+    {
+        [Fact]
+        public void Test_Instance()
+        {
+            var tracker = new UnackedMessageTracker(TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(1));
+            tracker.Should().BeOfType<UnackedMessageTracker>();
+        }
+
+
+        [Fact]
+        public async void Test_AwaitingAck_Elapsed()
+        {
+            //Arrange
+            var messageId = MessageId.Latest;
+            var sw = new Stopwatch();

Review comment:
       A stopwatch can be created and started in one go: StopWatch.StartNew();

##########
File path: tests/DotPulsar.Tests/Internal/MessageAcksTrackerTests.cs
##########
@@ -0,0 +1,172 @@
+namespace DotPulsar.Tests.Internal
+{
+    using DotPulsar.Internal;
+    using DotPulsar.Abstractions;
+    using FluentAssertions;
+    using Xunit;
+    using System;
+    using AutoFixture;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Linq.Expressions;
+    using AutoFixture.AutoNSubstitute;
+    using NSubstitute;
+    using System.Diagnostics;
+
+    public class UnackedMessageTrackerTests
+    {
+        [Fact]
+        public void Test_Instance()
+        {
+            var tracker = new UnackedMessageTracker(TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(1));
+            tracker.Should().BeOfType<UnackedMessageTracker>();
+        }
+

Review comment:
       Unnecessary double spacing. Just need a single space.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] jbvanzuylen commented on pull request #67: Support for (optional) ack timeout and nack delay for consumers

Posted by GitBox <gi...@apache.org>.
jbvanzuylen commented on pull request #67:
URL: https://github.com/apache/pulsar-dotpulsar/pull/67#issuecomment-877236565


   @blankensteiner @dionjansen any idea when this work will be finished and merged? Looks like a lot of work has been done and reviewed but wondering what is missing to cross the finish line.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org