You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bl...@apache.org on 2020/06/28 16:37:41 UTC

[pulsar-dotpulsar] branch master updated: Redeliver unacknowledged messages (#47)

This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 17f95aa  Redeliver unacknowledged messages (#47)
17f95aa is described below

commit 17f95aa1c9c0b264031f8e0a9639842082ddd739
Author: Dion Jansen <di...@wlnss.com>
AuthorDate: Sun Jun 28 18:37:31 2020 +0200

    Redeliver unacknowledged messages (#47)
    
    * Dockerise samples
    
    * Added basecommand extension for RedeliverUnacknowledgedMessages
    
    * Added send overloads to IConsumerChannel and IConnection for CommandRedeliverUnacknowledgedMessages
    
    * Connection impelmentation
    
    * Added implementation for Send CommandRedeliverUnacknowledgedMessages in consumerchannel
    
    * Implementation for Send CommandRedeliverUnacknowledgedMessages in NotReadyChannel
    
    * Added public API for RedeliverUnacknowledgedMessages with given message Ids or none
    
    * Reverted changes to samples
    
    * Indent
    
    * IEnumerable for consumer API
    
    * Use IEnumerable in consumer impl
    
    * New enumerable empty rather than list
    
    * ConfigureAwait's for Consumer.RedeliverUnacknowledgedMessages
---
 .gitignore                                           |  4 ++++
 src/DotPulsar/Abstractions/IConsumer.cs              | 10 ++++++++++
 src/DotPulsar/Internal/Abstractions/IConnection.cs   |  1 +
 .../Internal/Abstractions/IConsumerChannel.cs        |  1 +
 src/DotPulsar/Internal/Connection.cs                 |  3 +++
 src/DotPulsar/Internal/Consumer.cs                   | 20 ++++++++++++++++++++
 src/DotPulsar/Internal/ConsumerChannel.cs            |  6 ++++++
 .../Internal/Extensions/CommandExtensions.cs         |  7 +++++++
 src/DotPulsar/Internal/NotReadyChannel.cs            |  3 +++
 9 files changed, 55 insertions(+)

diff --git a/.gitignore b/.gitignore
index 35c74e6..abe8c84 100644
--- a/.gitignore
+++ b/.gitignore
@@ -287,3 +287,7 @@ Icon
 Network Trash Folder
 Temporary Items
 .apdisk
+
+# VSCode
+.devcontainer
+.vscode
\ No newline at end of file
diff --git a/src/DotPulsar/Abstractions/IConsumer.cs b/src/DotPulsar/Abstractions/IConsumer.cs
index e818a9c..ef060f4 100644
--- a/src/DotPulsar/Abstractions/IConsumer.cs
+++ b/src/DotPulsar/Abstractions/IConsumer.cs
@@ -106,5 +106,15 @@ namespace DotPulsar.Abstractions
         /// Unsubscribe the consumer.
         /// </summary>
         ValueTask Unsubscribe(CancellationToken cancellationToken = default);
+
+        /// <summary>
+        /// Redeliver the pending messages that were pushed to this consumer that are not yet acknowledged.
+        /// </summary>
+        ValueTask RedeliverUnacknowledgedMessages(IEnumerable<MessageId> messageIds, CancellationToken cancellationToken);
+
+        /// <summary>
+        /// Redeliver all pending messages that were pushed to this consumer that are not yet acknowledged.
+        /// </summary>
+        ValueTask RedeliverUnacknowledgedMessages(CancellationToken cancellationToken);
     }
 }
diff --git a/src/DotPulsar/Internal/Abstractions/IConnection.cs b/src/DotPulsar/Internal/Abstractions/IConnection.cs
index 1adbe8f..f4abd8b 100644
--- a/src/DotPulsar/Internal/Abstractions/IConnection.cs
+++ b/src/DotPulsar/Internal/Abstractions/IConnection.cs
@@ -30,6 +30,7 @@ namespace DotPulsar.Internal.Abstractions
         Task Send(CommandPong command, CancellationToken cancellationToken);
         Task Send(CommandAck command, CancellationToken cancellationToken);
         Task Send(CommandFlow command, CancellationToken cancellationToken);
+        Task Send(CommandRedeliverUnacknowledgedMessages command, CancellationToken cancellationToken);
 
         Task<BaseCommand> Send(CommandUnsubscribe command, CancellationToken cancellationToken);
         Task<BaseCommand> Send(CommandConnect command, CancellationToken cancellationToken);
diff --git a/src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs b/src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs
index d398cd8..a86ff5a 100644
--- a/src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs
+++ b/src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs
@@ -22,6 +22,7 @@ namespace DotPulsar.Internal.Abstractions
     public interface IConsumerChannel : IAsyncDisposable
     {
         Task Send(CommandAck command, CancellationToken cancellationToken);
+        Task Send(CommandRedeliverUnacknowledgedMessages command, CancellationToken cancellationToken);
         Task<CommandSuccess> Send(CommandUnsubscribe command, CancellationToken cancellationToken);
         Task<CommandSuccess> Send(CommandSeek command, CancellationToken cancellationToken);
         Task<CommandGetLastMessageIdResponse> Send(CommandGetLastMessageId command, CancellationToken cancellationToken);
diff --git a/src/DotPulsar/Internal/Connection.cs b/src/DotPulsar/Internal/Connection.cs
index 2582e73..989aa36 100644
--- a/src/DotPulsar/Internal/Connection.cs
+++ b/src/DotPulsar/Internal/Connection.cs
@@ -97,6 +97,9 @@ namespace DotPulsar.Internal
         public Task Send(CommandFlow command, CancellationToken cancellationToken)
             => Send(command.AsBaseCommand(), cancellationToken);
 
+        public Task Send(CommandRedeliverUnacknowledgedMessages command, CancellationToken cancellationToken)
+            => Send(command.AsBaseCommand(), cancellationToken);
+
         public async Task<BaseCommand> Send(CommandUnsubscribe command, CancellationToken cancellationToken)
         {
             ThrowIfDisposed();
diff --git a/src/DotPulsar/Internal/Consumer.cs b/src/DotPulsar/Internal/Consumer.cs
index 80e6369..fe5fce3 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -21,6 +21,7 @@ namespace DotPulsar.Internal
     using PulsarApi;
     using System;
     using System.Collections.Generic;
+    using System.Linq;
     using System.Runtime.CompilerServices;
     using System.Threading;
     using System.Threading.Tasks;
@@ -31,6 +32,7 @@ namespace DotPulsar.Internal
         private readonly IRegisterEvent _eventRegister;
         private IConsumerChannel _channel;
         private readonly CommandAck _cachedCommandAck;
+        private readonly CommandRedeliverUnacknowledgedMessages _cachedCommandRedeliverUnacknowledgedMessages;
         private readonly IExecute _executor;
         private readonly IStateChanged<ConsumerState> _state;
         private int _isDisposed;
@@ -52,6 +54,7 @@ namespace DotPulsar.Internal
             _executor = executor;
             _state = state;
             _cachedCommandAck = new CommandAck();
+            _cachedCommandRedeliverUnacknowledgedMessages = new CommandRedeliverUnacknowledgedMessages();
             _isDisposed = 0;
 
             _eventRegister.Register(new ConsumerCreated(_correlationId, this));
@@ -104,6 +107,12 @@ namespace DotPulsar.Internal
         public async ValueTask AcknowledgeCumulative(MessageId messageId, CancellationToken cancellationToken)
             => await Acknowledge(messageId.Data, CommandAck.AckType.Cumulative, cancellationToken).ConfigureAwait(false);
 
+        public async ValueTask RedeliverUnacknowledgedMessages(IEnumerable<MessageId> messageIds, CancellationToken cancellationToken)
+            => await RedeliverUnacknowledgedMessages(messageIds.Select(m => m.Data).ToList(), cancellationToken).ConfigureAwait(false);
+
+        public async ValueTask RedeliverUnacknowledgedMessages(CancellationToken cancellationToken)
+            => await RedeliverUnacknowledgedMessages(Enumerable.Empty<MessageId>(), cancellationToken).ConfigureAwait(false);
+
         public async ValueTask Unsubscribe(CancellationToken cancellationToken)
         {
             ThrowIfDisposed();
@@ -145,6 +154,17 @@ namespace DotPulsar.Internal
             }, cancellationToken).ConfigureAwait(false);
         }
 
+        private async ValueTask RedeliverUnacknowledgedMessages(List<MessageIdData> messageIds, CancellationToken cancellationToken)
+        {
+            ThrowIfDisposed();
+
+            await _executor.Execute(() =>
+            {
+                _cachedCommandRedeliverUnacknowledgedMessages.MessageIds.Clear();
+                _cachedCommandAck.MessageIds.AddRange(messageIds);
+                return _channel.Send(_cachedCommandRedeliverUnacknowledgedMessages, cancellationToken);
+            }, cancellationToken).ConfigureAwait(false);
+        }
         internal async ValueTask SetChannel(IConsumerChannel channel)
         {
             if (_isDisposed != 0)
diff --git a/src/DotPulsar/Internal/ConsumerChannel.cs b/src/DotPulsar/Internal/ConsumerChannel.cs
index e0768c6..c08649d 100644
--- a/src/DotPulsar/Internal/ConsumerChannel.cs
+++ b/src/DotPulsar/Internal/ConsumerChannel.cs
@@ -105,6 +105,12 @@ namespace DotPulsar.Internal
             await _connection.Send(command, cancellationToken).ConfigureAwait(false);
         }
 
+        public async Task Send(CommandRedeliverUnacknowledgedMessages command, CancellationToken cancellationToken)
+        {
+            command.ConsumerId = _id;
+            await _connection.Send(command, cancellationToken).ConfigureAwait(false);
+        }
+
         public async Task<CommandSuccess> Send(CommandUnsubscribe command, CancellationToken cancellationToken)
         {
             command.ConsumerId = _id;
diff --git a/src/DotPulsar/Internal/Extensions/CommandExtensions.cs b/src/DotPulsar/Internal/Extensions/CommandExtensions.cs
index f72fa31..4b182dc 100644
--- a/src/DotPulsar/Internal/Extensions/CommandExtensions.cs
+++ b/src/DotPulsar/Internal/Extensions/CommandExtensions.cs
@@ -166,5 +166,12 @@ namespace DotPulsar.Internal.Extensions
                 CommandType = BaseCommand.Type.Seek,
                 Seek = command
             };
+        
+        public static BaseCommand AsBaseCommand(this CommandRedeliverUnacknowledgedMessages command)
+            => new BaseCommand
+            {
+                CommandType = BaseCommand.Type.RedeliverUnacknowledgedMessages,
+                RedeliverUnacknowledgedMessages = command
+            };
     }
 }
diff --git a/src/DotPulsar/Internal/NotReadyChannel.cs b/src/DotPulsar/Internal/NotReadyChannel.cs
index aec3f34..93f572a 100644
--- a/src/DotPulsar/Internal/NotReadyChannel.cs
+++ b/src/DotPulsar/Internal/NotReadyChannel.cs
@@ -39,6 +39,9 @@ namespace DotPulsar.Internal
         public Task Send(CommandAck command, CancellationToken cancellationToken)
             => throw GetException();
 
+        public Task Send(CommandRedeliverUnacknowledgedMessages command, CancellationToken cancellationToken)
+            => throw GetException();
+
         public Task<CommandSuccess> Send(CommandUnsubscribe command, CancellationToken cancellationToken)
             => throw GetException();