You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bl...@apache.org on 2020/06/22 12:28:02 UTC
[pulsar-dotpulsar] branch master updated: Expose 'RedeliveryCount'
as requested in #41
This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 97f8b70 Expose 'RedeliveryCount' as requested in #41
97f8b70 is described below
commit 97f8b70b1059b0bbdbf5df227c9f443d057eb716
Author: Daniel Blankensteiner <db...@danskecommodities.com>
AuthorDate: Mon Jun 22 14:27:48 2020 +0200
Expose 'RedeliveryCount' as requested in #41
---
src/DotPulsar/Internal/BatchHandler.cs | 4 ++--
src/DotPulsar/Internal/ChannelManager.cs | 2 +-
src/DotPulsar/Internal/ConsumerChannel.cs | 5 +++--
src/DotPulsar/Internal/MessagePackage.cs | 4 +++-
src/DotPulsar/Message.cs | 3 +++
5 files changed, 12 insertions(+), 6 deletions(-)
diff --git a/src/DotPulsar/Internal/BatchHandler.cs b/src/DotPulsar/Internal/BatchHandler.cs
index 8209956..cb6ab13 100644
--- a/src/DotPulsar/Internal/BatchHandler.cs
+++ b/src/DotPulsar/Internal/BatchHandler.cs
@@ -33,7 +33,7 @@ namespace DotPulsar.Internal
_batches = new LinkedList<Batch>();
}
- public Message Add(MessageIdData messageId, MessageMetadata metadata, ReadOnlySequence<byte> data)
+ public Message Add(MessageIdData messageId, uint redeliveryCount, MessageMetadata metadata, ReadOnlySequence<byte> data)
{
if (_trackBatches)
_batches.AddLast(new Batch(messageId, metadata.NumMessagesInBatch));
@@ -47,7 +47,7 @@ namespace DotPulsar.Internal
var singleMetadata = Serializer.Deserialize<SingleMessageMetadata>(data.Slice(index, singleMetadataSize));
index += singleMetadataSize;
var singleMessageId = new MessageId(messageId.LedgerId, messageId.EntryId, messageId.Partition, i);
- var message = new Message(singleMessageId, metadata, singleMetadata, data.Slice(index, singleMetadata.PayloadSize));
+ var message = new Message(singleMessageId, redeliveryCount, metadata, singleMetadata, data.Slice(index, singleMetadata.PayloadSize));
_messages.Enqueue(message);
index += (uint) singleMetadata.PayloadSize;
}
diff --git a/src/DotPulsar/Internal/ChannelManager.cs b/src/DotPulsar/Internal/ChannelManager.cs
index 07c1d5a..d2eda4d 100644
--- a/src/DotPulsar/Internal/ChannelManager.cs
+++ b/src/DotPulsar/Internal/ChannelManager.cs
@@ -130,7 +130,7 @@ namespace DotPulsar.Internal
=> _consumerChannels[command.ConsumerId]?.ReachedEndOfTopic();
public void Incoming(CommandMessage command, ReadOnlySequence<byte> data)
- => _consumerChannels[command.ConsumerId]?.Received(new MessagePackage(command.MessageId, data));
+ => _consumerChannels[command.ConsumerId]?.Received(new MessagePackage(command.MessageId, command.RedeliveryCount, data));
public void Dispose()
{
diff --git a/src/DotPulsar/Internal/ConsumerChannel.cs b/src/DotPulsar/Internal/ConsumerChannel.cs
index cd0ba24..e0768c6 100644
--- a/src/DotPulsar/Internal/ConsumerChannel.cs
+++ b/src/DotPulsar/Internal/ConsumerChannel.cs
@@ -76,13 +76,14 @@ namespace DotPulsar.Internal
}
var metadataSize = messagePackage.GetMetadataSize();
+ var redeliveryCount = messagePackage.RedeliveryCount;
var data = messagePackage.ExtractData(metadataSize);
var metadata = messagePackage.ExtractMetadata(metadataSize);
var messageId = messagePackage.MessageId;
return metadata.NumMessagesInBatch == 1
- ? new Message(new MessageId(messageId), metadata, null, data)
- : _batchHandler.Add(messageId, metadata, data);
+ ? new Message(new MessageId(messageId), redeliveryCount, metadata, null, data)
+ : _batchHandler.Add(messageId, redeliveryCount, metadata, data);
}
}
diff --git a/src/DotPulsar/Internal/MessagePackage.cs b/src/DotPulsar/Internal/MessagePackage.cs
index b953635..ef0bb32 100644
--- a/src/DotPulsar/Internal/MessagePackage.cs
+++ b/src/DotPulsar/Internal/MessagePackage.cs
@@ -19,13 +19,15 @@ namespace DotPulsar.Internal
public readonly struct MessagePackage
{
- public MessagePackage(MessageIdData messageId, ReadOnlySequence<byte> data)
+ public MessagePackage(MessageIdData messageId, uint redeliveryCount, ReadOnlySequence<byte> data)
{
MessageId = messageId;
+ RedeliveryCount = redeliveryCount;
Data = data;
}
public MessageIdData MessageId { get; }
+ public uint RedeliveryCount { get; }
public ReadOnlySequence<byte> Data { get; }
}
}
diff --git a/src/DotPulsar/Message.cs b/src/DotPulsar/Message.cs
index 0e4dfac..cc462c0 100644
--- a/src/DotPulsar/Message.cs
+++ b/src/DotPulsar/Message.cs
@@ -27,11 +27,13 @@ namespace DotPulsar
internal Message(
MessageId messageId,
+ uint redeliveryCount,
Internal.PulsarApi.MessageMetadata metadata,
SingleMessageMetadata? singleMetadata,
ReadOnlySequence<byte> data)
{
MessageId = messageId;
+ RedeliveryCount = redeliveryCount;
ProducerName = metadata.ProducerName;
PublishTime = metadata.PublishTime;
Data = data;
@@ -60,6 +62,7 @@ namespace DotPulsar
public ReadOnlySequence<byte> Data { get; }
public string ProducerName { get; }
public ulong SequenceId { get; }
+ public uint RedeliveryCount { get; }
public bool HasEventTime => EventTime != 0;
public ulong EventTime { get; }