You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bl...@apache.org on 2020/06/28 16:37:41 UTC
[pulsar-dotpulsar] branch master updated: Redeliver unacknowledged
messages (#47)
This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 17f95aa Redeliver unacknowledged messages (#47)
17f95aa is described below
commit 17f95aa1c9c0b264031f8e0a9639842082ddd739
Author: Dion Jansen <di...@wlnss.com>
AuthorDate: Sun Jun 28 18:37:31 2020 +0200
Redeliver unacknowledged messages (#47)
* Dockerise samples
* Added basecommand extension for RedeliverUnacknowledgedMessages
* Added send overloads to IConsumerChannel and IConnection for CommandRedeliverUnacknowledgedMessages
* Connection impelmentation
* Added implementation for Send CommandRedeliverUnacknowledgedMessages in consumerchannel
* Implementation for Send CommandRedeliverUnacknowledgedMessages in NotReadyChannel
* Added public API for RedeliverUnacknowledgedMessages with given message Ids or none
* Reverted changes to samples
* Indent
* IEnumerable for consumer API
* Use IEnumerable in consumer impl
* New enumerable empty rather than list
* ConfigureAwait's for Consumer.RedeliverUnacknowledgedMessages
---
.gitignore | 4 ++++
src/DotPulsar/Abstractions/IConsumer.cs | 10 ++++++++++
src/DotPulsar/Internal/Abstractions/IConnection.cs | 1 +
.../Internal/Abstractions/IConsumerChannel.cs | 1 +
src/DotPulsar/Internal/Connection.cs | 3 +++
src/DotPulsar/Internal/Consumer.cs | 20 ++++++++++++++++++++
src/DotPulsar/Internal/ConsumerChannel.cs | 6 ++++++
.../Internal/Extensions/CommandExtensions.cs | 7 +++++++
src/DotPulsar/Internal/NotReadyChannel.cs | 3 +++
9 files changed, 55 insertions(+)
diff --git a/.gitignore b/.gitignore
index 35c74e6..abe8c84 100644
--- a/.gitignore
+++ b/.gitignore
@@ -287,3 +287,7 @@ Icon
Network Trash Folder
Temporary Items
.apdisk
+
+# VSCode
+.devcontainer
+.vscode
\ No newline at end of file
diff --git a/src/DotPulsar/Abstractions/IConsumer.cs b/src/DotPulsar/Abstractions/IConsumer.cs
index e818a9c..ef060f4 100644
--- a/src/DotPulsar/Abstractions/IConsumer.cs
+++ b/src/DotPulsar/Abstractions/IConsumer.cs
@@ -106,5 +106,15 @@ namespace DotPulsar.Abstractions
/// Unsubscribe the consumer.
/// </summary>
ValueTask Unsubscribe(CancellationToken cancellationToken = default);
+
+ /// <summary>
+ /// Redeliver the pending messages that were pushed to this consumer that are not yet acknowledged.
+ /// </summary>
+ ValueTask RedeliverUnacknowledgedMessages(IEnumerable<MessageId> messageIds, CancellationToken cancellationToken);
+
+ /// <summary>
+ /// Redeliver all pending messages that were pushed to this consumer that are not yet acknowledged.
+ /// </summary>
+ ValueTask RedeliverUnacknowledgedMessages(CancellationToken cancellationToken);
}
}
diff --git a/src/DotPulsar/Internal/Abstractions/IConnection.cs b/src/DotPulsar/Internal/Abstractions/IConnection.cs
index 1adbe8f..f4abd8b 100644
--- a/src/DotPulsar/Internal/Abstractions/IConnection.cs
+++ b/src/DotPulsar/Internal/Abstractions/IConnection.cs
@@ -30,6 +30,7 @@ namespace DotPulsar.Internal.Abstractions
Task Send(CommandPong command, CancellationToken cancellationToken);
Task Send(CommandAck command, CancellationToken cancellationToken);
Task Send(CommandFlow command, CancellationToken cancellationToken);
+ Task Send(CommandRedeliverUnacknowledgedMessages command, CancellationToken cancellationToken);
Task<BaseCommand> Send(CommandUnsubscribe command, CancellationToken cancellationToken);
Task<BaseCommand> Send(CommandConnect command, CancellationToken cancellationToken);
diff --git a/src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs b/src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs
index d398cd8..a86ff5a 100644
--- a/src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs
+++ b/src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs
@@ -22,6 +22,7 @@ namespace DotPulsar.Internal.Abstractions
public interface IConsumerChannel : IAsyncDisposable
{
Task Send(CommandAck command, CancellationToken cancellationToken);
+ Task Send(CommandRedeliverUnacknowledgedMessages command, CancellationToken cancellationToken);
Task<CommandSuccess> Send(CommandUnsubscribe command, CancellationToken cancellationToken);
Task<CommandSuccess> Send(CommandSeek command, CancellationToken cancellationToken);
Task<CommandGetLastMessageIdResponse> Send(CommandGetLastMessageId command, CancellationToken cancellationToken);
diff --git a/src/DotPulsar/Internal/Connection.cs b/src/DotPulsar/Internal/Connection.cs
index 2582e73..989aa36 100644
--- a/src/DotPulsar/Internal/Connection.cs
+++ b/src/DotPulsar/Internal/Connection.cs
@@ -97,6 +97,9 @@ namespace DotPulsar.Internal
public Task Send(CommandFlow command, CancellationToken cancellationToken)
=> Send(command.AsBaseCommand(), cancellationToken);
+ public Task Send(CommandRedeliverUnacknowledgedMessages command, CancellationToken cancellationToken)
+ => Send(command.AsBaseCommand(), cancellationToken);
+
public async Task<BaseCommand> Send(CommandUnsubscribe command, CancellationToken cancellationToken)
{
ThrowIfDisposed();
diff --git a/src/DotPulsar/Internal/Consumer.cs b/src/DotPulsar/Internal/Consumer.cs
index 80e6369..fe5fce3 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -21,6 +21,7 @@ namespace DotPulsar.Internal
using PulsarApi;
using System;
using System.Collections.Generic;
+ using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
@@ -31,6 +32,7 @@ namespace DotPulsar.Internal
private readonly IRegisterEvent _eventRegister;
private IConsumerChannel _channel;
private readonly CommandAck _cachedCommandAck;
+ private readonly CommandRedeliverUnacknowledgedMessages _cachedCommandRedeliverUnacknowledgedMessages;
private readonly IExecute _executor;
private readonly IStateChanged<ConsumerState> _state;
private int _isDisposed;
@@ -52,6 +54,7 @@ namespace DotPulsar.Internal
_executor = executor;
_state = state;
_cachedCommandAck = new CommandAck();
+ _cachedCommandRedeliverUnacknowledgedMessages = new CommandRedeliverUnacknowledgedMessages();
_isDisposed = 0;
_eventRegister.Register(new ConsumerCreated(_correlationId, this));
@@ -104,6 +107,12 @@ namespace DotPulsar.Internal
public async ValueTask AcknowledgeCumulative(MessageId messageId, CancellationToken cancellationToken)
=> await Acknowledge(messageId.Data, CommandAck.AckType.Cumulative, cancellationToken).ConfigureAwait(false);
+ public async ValueTask RedeliverUnacknowledgedMessages(IEnumerable<MessageId> messageIds, CancellationToken cancellationToken)
+ => await RedeliverUnacknowledgedMessages(messageIds.Select(m => m.Data).ToList(), cancellationToken).ConfigureAwait(false);
+
+ public async ValueTask RedeliverUnacknowledgedMessages(CancellationToken cancellationToken)
+ => await RedeliverUnacknowledgedMessages(Enumerable.Empty<MessageId>(), cancellationToken).ConfigureAwait(false);
+
public async ValueTask Unsubscribe(CancellationToken cancellationToken)
{
ThrowIfDisposed();
@@ -145,6 +154,17 @@ namespace DotPulsar.Internal
}, cancellationToken).ConfigureAwait(false);
}
+ private async ValueTask RedeliverUnacknowledgedMessages(List<MessageIdData> messageIds, CancellationToken cancellationToken)
+ {
+ ThrowIfDisposed();
+
+ await _executor.Execute(() =>
+ {
+ _cachedCommandRedeliverUnacknowledgedMessages.MessageIds.Clear();
+ _cachedCommandAck.MessageIds.AddRange(messageIds);
+ return _channel.Send(_cachedCommandRedeliverUnacknowledgedMessages, cancellationToken);
+ }, cancellationToken).ConfigureAwait(false);
+ }
internal async ValueTask SetChannel(IConsumerChannel channel)
{
if (_isDisposed != 0)
diff --git a/src/DotPulsar/Internal/ConsumerChannel.cs b/src/DotPulsar/Internal/ConsumerChannel.cs
index e0768c6..c08649d 100644
--- a/src/DotPulsar/Internal/ConsumerChannel.cs
+++ b/src/DotPulsar/Internal/ConsumerChannel.cs
@@ -105,6 +105,12 @@ namespace DotPulsar.Internal
await _connection.Send(command, cancellationToken).ConfigureAwait(false);
}
+ public async Task Send(CommandRedeliverUnacknowledgedMessages command, CancellationToken cancellationToken)
+ {
+ command.ConsumerId = _id;
+ await _connection.Send(command, cancellationToken).ConfigureAwait(false);
+ }
+
public async Task<CommandSuccess> Send(CommandUnsubscribe command, CancellationToken cancellationToken)
{
command.ConsumerId = _id;
diff --git a/src/DotPulsar/Internal/Extensions/CommandExtensions.cs b/src/DotPulsar/Internal/Extensions/CommandExtensions.cs
index f72fa31..4b182dc 100644
--- a/src/DotPulsar/Internal/Extensions/CommandExtensions.cs
+++ b/src/DotPulsar/Internal/Extensions/CommandExtensions.cs
@@ -166,5 +166,12 @@ namespace DotPulsar.Internal.Extensions
CommandType = BaseCommand.Type.Seek,
Seek = command
};
+
+ public static BaseCommand AsBaseCommand(this CommandRedeliverUnacknowledgedMessages command)
+ => new BaseCommand
+ {
+ CommandType = BaseCommand.Type.RedeliverUnacknowledgedMessages,
+ RedeliverUnacknowledgedMessages = command
+ };
}
}
diff --git a/src/DotPulsar/Internal/NotReadyChannel.cs b/src/DotPulsar/Internal/NotReadyChannel.cs
index aec3f34..93f572a 100644
--- a/src/DotPulsar/Internal/NotReadyChannel.cs
+++ b/src/DotPulsar/Internal/NotReadyChannel.cs
@@ -39,6 +39,9 @@ namespace DotPulsar.Internal
public Task Send(CommandAck command, CancellationToken cancellationToken)
=> throw GetException();
+ public Task Send(CommandRedeliverUnacknowledgedMessages command, CancellationToken cancellationToken)
+ => throw GetException();
+
public Task<CommandSuccess> Send(CommandUnsubscribe command, CancellationToken cancellationToken)
=> throw GetException();