You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bl...@apache.org on 2020/12/12 23:15:52 UTC
[pulsar-dotpulsar] branch master updated: Implementing seeking on
the message publish time from both the consumer and reader
This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 56f4f09 Implementing seeking on the message publish time from both the consumer and reader
56f4f09 is described below
commit 56f4f0998f1afe6bfa2f35a208de2dfed6a89979
Author: Daniel Blankensteiner <db...@vmail.dk>
AuthorDate: Sun Dec 13 00:15:38 2020 +0100
Implementing seeking on the message publish time from both the consumer and reader
---
src/DotPulsar/Abstractions/IConsumer.cs | 17 ++++++++++-
src/DotPulsar/Abstractions/IReader.cs | 20 +++++++++++++
.../Internal/Abstractions/IReaderChannel.cs | 2 ++
src/DotPulsar/Internal/Consumer.cs | 34 ++++++++++++++++------
src/DotPulsar/Internal/Reader.cs | 33 +++++++++++++++++++++
5 files changed, 96 insertions(+), 10 deletions(-)
diff --git a/src/DotPulsar/Abstractions/IConsumer.cs b/src/DotPulsar/Abstractions/IConsumer.cs
index ef060f4..48fe671 100644
--- a/src/DotPulsar/Abstractions/IConsumer.cs
+++ b/src/DotPulsar/Abstractions/IConsumer.cs
@@ -66,7 +66,7 @@ namespace DotPulsar.Abstractions
bool IsFinalState(ConsumerState state);
/// <summary>
- /// Get an IAsyncEnumerable for consuming messages
+ /// Get an IAsyncEnumerable for consuming messages.
/// </summary>
IAsyncEnumerable<Message> Messages(CancellationToken cancellationToken = default);
@@ -76,6 +76,21 @@ namespace DotPulsar.Abstractions
ValueTask Seek(MessageId messageId, CancellationToken cancellationToken = default);
/// <summary>
+ /// Reset the subscription associated with this consumer to a specific message publish time using unix time in milliseconds.
+ /// </summary>
+ ValueTask Seek(ulong publishTime, CancellationToken cancellationToken = default);
+
+ /// <summary>
+ /// Reset the subscription associated with this consumer to a specific message publish time using an UTC DateTime.
+ /// </summary>
+ ValueTask Seek(DateTime publishTime, CancellationToken cancellationToken = default);
+
+ /// <summary>
+ /// Reset the subscription associated with this consumer to a specific message publish time using a DateTimeOffset.
+ /// </summary>
+ ValueTask Seek(DateTimeOffset publishTime, CancellationToken cancellationToken = default);
+
+ /// <summary>
/// Wait for the state to change to a specific state.
/// </summary>
/// <returns>
diff --git a/src/DotPulsar/Abstractions/IReader.cs b/src/DotPulsar/Abstractions/IReader.cs
index a44ea9c..7b9502a 100644
--- a/src/DotPulsar/Abstractions/IReader.cs
+++ b/src/DotPulsar/Abstractions/IReader.cs
@@ -46,6 +46,26 @@ namespace DotPulsar.Abstractions
IAsyncEnumerable<Message> Messages(CancellationToken cancellationToken = default);
/// <summary>
+ /// Reset the subscription associated with this reader to a specific MessageId.
+ /// </summary>
+ ValueTask Seek(MessageId messageId, CancellationToken cancellationToken = default);
+
+ /// <summary>
+ /// Reset the subscription associated with this reader to a specific message publish time using unix time in milliseconds.
+ /// </summary>
+ ValueTask Seek(ulong publishTime, CancellationToken cancellationToken = default);
+
+ /// <summary>
+ /// Reset the subscription associated with this reader to a specific message publish time using an UTC DateTime.
+ /// </summary>
+ ValueTask Seek(DateTime publishTime, CancellationToken cancellationToken = default);
+
+ /// <summary>
+ /// Reset the subscription associated with this reader to a specific message publish time using a DateTimeOffset.
+ /// </summary>
+ ValueTask Seek(DateTimeOffset publishTime, CancellationToken cancellationToken = default);
+
+ /// <summary>
/// Wait for the state to change to a specific state.
/// </summary>
/// <returns>
diff --git a/src/DotPulsar/Internal/Abstractions/IReaderChannel.cs b/src/DotPulsar/Internal/Abstractions/IReaderChannel.cs
index 64c2289..a63762b 100644
--- a/src/DotPulsar/Internal/Abstractions/IReaderChannel.cs
+++ b/src/DotPulsar/Internal/Abstractions/IReaderChannel.cs
@@ -14,12 +14,14 @@
namespace DotPulsar.Internal.Abstractions
{
+ using DotPulsar.Internal.PulsarApi;
using System;
using System.Threading;
using System.Threading.Tasks;
public interface IReaderChannel : IAsyncDisposable
{
+ Task<CommandSuccess> Send(CommandSeek command, CancellationToken cancellationToken);
ValueTask<Message> Receive(CancellationToken cancellationToken = default);
}
}
diff --git a/src/DotPulsar/Internal/Consumer.cs b/src/DotPulsar/Internal/Consumer.cs
index 33e807f..dad2416 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -124,11 +124,31 @@ namespace DotPulsar.Internal
{
ThrowIfDisposed();
- var seek = new CommandSeek
- {
- MessageId = messageId.Data
- };
+ var seek = new CommandSeek { MessageId = messageId.Data };
+ _ = await _executor.Execute(() => _channel.Send(seek, cancellationToken), cancellationToken).ConfigureAwait(false);
+ }
+
+ public async ValueTask Seek(ulong publishTime, CancellationToken cancellationToken)
+ {
+ ThrowIfDisposed();
+
+ var seek = new CommandSeek { MessagePublishTime = publishTime };
+ _ = await _executor.Execute(() => _channel.Send(seek, cancellationToken), cancellationToken).ConfigureAwait(false);
+ }
+
+ public async ValueTask Seek(DateTime publishTime, CancellationToken cancellationToken)
+ {
+ ThrowIfDisposed();
+
+ var seek = new CommandSeek { MessagePublishTime = (ulong) new DateTimeOffset(publishTime).ToUnixTimeMilliseconds() };
+ _ = await _executor.Execute(() => _channel.Send(seek, cancellationToken), cancellationToken).ConfigureAwait(false);
+ }
+ public async ValueTask Seek(DateTimeOffset publishTime, CancellationToken cancellationToken)
+ {
+ ThrowIfDisposed();
+
+ var seek = new CommandSeek { MessagePublishTime = (ulong) publishTime.ToUnixTimeMilliseconds() };
_ = await _executor.Execute(() => _channel.Send(seek, cancellationToken), cancellationToken).ConfigureAwait(false);
}
@@ -170,11 +190,7 @@ namespace DotPulsar.Internal
var redeliverUnacknowledgedMessages = new CommandRedeliverUnacknowledgedMessages();
redeliverUnacknowledgedMessages.MessageIds.AddRange(messageIds);
-
- await _executor.Execute(() =>
- {
- return _channel.Send(redeliverUnacknowledgedMessages, cancellationToken);
- }, cancellationToken).ConfigureAwait(false);
+ await _executor.Execute(() => _channel.Send(redeliverUnacknowledgedMessages, cancellationToken), cancellationToken).ConfigureAwait(false);
}
internal async ValueTask SetChannel(IConsumerChannel channel)
diff --git a/src/DotPulsar/Internal/Reader.cs b/src/DotPulsar/Internal/Reader.cs
index 82f0e91..3dccb73 100644
--- a/src/DotPulsar/Internal/Reader.cs
+++ b/src/DotPulsar/Internal/Reader.cs
@@ -17,6 +17,7 @@ namespace DotPulsar.Internal
using Abstractions;
using DotPulsar.Abstractions;
using DotPulsar.Exceptions;
+ using DotPulsar.Internal.PulsarApi;
using Events;
using System;
using System.Collections.Generic;
@@ -80,6 +81,38 @@ namespace DotPulsar.Internal
yield return await _executor.Execute(() => _channel.Receive(cancellationToken), cancellationToken).ConfigureAwait(false);
}
+ public async ValueTask Seek(MessageId messageId, CancellationToken cancellationToken)
+ {
+ ThrowIfDisposed();
+
+ var seek = new CommandSeek { MessageId = messageId.Data };
+ _ = await _executor.Execute(() => _channel.Send(seek, cancellationToken), cancellationToken).ConfigureAwait(false);
+ }
+
+ public async ValueTask Seek(ulong publishTime, CancellationToken cancellationToken)
+ {
+ ThrowIfDisposed();
+
+ var seek = new CommandSeek { MessagePublishTime = publishTime };
+ _ = await _executor.Execute(() => _channel.Send(seek, cancellationToken), cancellationToken).ConfigureAwait(false);
+ }
+
+ public async ValueTask Seek(DateTime publishTime, CancellationToken cancellationToken)
+ {
+ ThrowIfDisposed();
+
+ var seek = new CommandSeek { MessagePublishTime = (ulong) new DateTimeOffset(publishTime).ToUnixTimeMilliseconds() };
+ _ = await _executor.Execute(() => _channel.Send(seek, cancellationToken), cancellationToken).ConfigureAwait(false);
+ }
+
+ public async ValueTask Seek(DateTimeOffset publishTime, CancellationToken cancellationToken)
+ {
+ ThrowIfDisposed();
+
+ var seek = new CommandSeek { MessagePublishTime = (ulong) publishTime.ToUnixTimeMilliseconds() };
+ _ = await _executor.Execute(() => _channel.Send(seek, cancellationToken), cancellationToken).ConfigureAwait(false);
+ }
+
public async ValueTask DisposeAsync()
{
if (Interlocked.Exchange(ref _isDisposed, 1) != 0)