You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bl...@apache.org on 2023/03/08 09:20:45 UTC
[pulsar-dotpulsar] branch master updated: 'ReplicateSubscriptionState' can now be set when creating a consumer
This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new fa7817d 'ReplicateSubscriptionState' can now be set when creating a consumer
fa7817d is described below
commit fa7817d6ac1d1bfe13f1884005bd9264afd1dcf1
Author: Daniel Blankensteiner <db...@vmail.dk>
AuthorDate: Wed Mar 8 10:20:36 2023 +0100
'ReplicateSubscriptionState' can now be set when creating a consumer
---
CHANGELOG.md | 6 ++++++
src/DotPulsar/Abstractions/IConsumerBuilder.cs | 5 +++++
src/DotPulsar/ConsumerOptions.cs | 11 +++++++++++
src/DotPulsar/Internal/ConsumerBuilder.cs | 9 +++++++++
src/DotPulsar/PulsarClient.cs | 1 +
5 files changed, 32 insertions(+)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 257b35f..8c9f57f 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
+## [Unreleased]
+
+### Added
+
+- 'ReplicateSubscriptionState' can now be set when creating a consumer. The default is 'false'
+
## [2.10.2] - 2023-02-17
### Fixed
diff --git a/src/DotPulsar/Abstractions/IConsumerBuilder.cs b/src/DotPulsar/Abstractions/IConsumerBuilder.cs
index c4dc94d..d28c0f5 100644
--- a/src/DotPulsar/Abstractions/IConsumerBuilder.cs
+++ b/src/DotPulsar/Abstractions/IConsumerBuilder.cs
@@ -44,6 +44,11 @@ public interface IConsumerBuilder<TMessage>
/// </summary>
IConsumerBuilder<TMessage> ReadCompacted(bool readCompacted);
+ /// <summary>
+ /// Whether to replicate the subscription's state across clusters (when using geo-replication). The default is 'false'.
+ /// </summary>
+ IConsumerBuilder<TMessage> ReplicateSubscriptionState(bool replicateSubscriptionState);
+
/// <summary>
/// Register a state changed handler.
/// </summary>
diff --git a/src/DotPulsar/ConsumerOptions.cs b/src/DotPulsar/ConsumerOptions.cs
index 3e9d112..977f184 100644
--- a/src/DotPulsar/ConsumerOptions.cs
+++ b/src/DotPulsar/ConsumerOptions.cs
@@ -42,6 +42,11 @@ public sealed class ConsumerOptions<TMessage>
/// </summary>
public static readonly bool DefaultReadCompacted = false;
+ /// <summary>
+ /// The default of whether to replicate the subscription's state.
+ /// </summary>
+ public static readonly bool DefaultReplicateSubscriptionState = false;
+
/// <summary>
/// The default subscription type.
/// </summary>
@@ -56,6 +61,7 @@ public sealed class ConsumerOptions<TMessage>
PriorityLevel = DefaultPriorityLevel;
MessagePrefetchCount = DefaultMessagePrefetchCount;
ReadCompacted = DefaultReadCompacted;
+ ReplicateSubscriptionState = DefaultReplicateSubscriptionState;
SubscriptionType = DefaultSubscriptionType;
SubscriptionProperties = new Dictionary<string, string>();
SubscriptionName = subscriptionName;
@@ -88,6 +94,11 @@ public sealed class ConsumerOptions<TMessage>
/// </summary>
public bool ReadCompacted { get; set; }
+ /// <summary>
+ /// Whether to replicate the subscription's state across clusters (when using geo-replication). The default is 'false'.
+ /// </summary>
+ public bool ReplicateSubscriptionState { get; set; }
+
/// <summary>
/// Set the schema. This is required.
/// </summary>
diff --git a/src/DotPulsar/Internal/ConsumerBuilder.cs b/src/DotPulsar/Internal/ConsumerBuilder.cs
index cf85cc1..49267af 100644
--- a/src/DotPulsar/Internal/ConsumerBuilder.cs
+++ b/src/DotPulsar/Internal/ConsumerBuilder.cs
@@ -27,6 +27,7 @@ public sealed class ConsumerBuilder<TMessage> : IConsumerBuilder<TMessage>
private int _priorityLevel;
private uint _messagePrefetchCount;
private bool _readCompacted;
+ private bool _replicateSubscriptionState;
private string? _subscriptionName;
private readonly Dictionary<string, string> _subscriptionProperties;
private SubscriptionType _subscriptionType;
@@ -41,6 +42,7 @@ public sealed class ConsumerBuilder<TMessage> : IConsumerBuilder<TMessage>
_priorityLevel = ConsumerOptions<TMessage>.DefaultPriorityLevel;
_messagePrefetchCount = ConsumerOptions<TMessage>.DefaultMessagePrefetchCount;
_readCompacted = ConsumerOptions<TMessage>.DefaultReadCompacted;
+ _replicateSubscriptionState = ConsumerOptions<TMessage>.DefaultReplicateSubscriptionState;
_subscriptionProperties = new Dictionary<string, string>();
_subscriptionType = ConsumerOptions<TMessage>.DefaultSubscriptionType;
}
@@ -75,6 +77,12 @@ public sealed class ConsumerBuilder<TMessage> : IConsumerBuilder<TMessage>
return this;
}
+ public IConsumerBuilder<TMessage> ReplicateSubscriptionState(bool replicateSubscriptionState)
+ {
+ _replicateSubscriptionState = replicateSubscriptionState;
+ return this;
+ }
+
public IConsumerBuilder<TMessage> StateChangedHandler(IHandleStateChanged<ConsumerStateChanged> handler)
{
_stateChangedHandler = handler;
@@ -120,6 +128,7 @@ public sealed class ConsumerBuilder<TMessage> : IConsumerBuilder<TMessage>
MessagePrefetchCount = _messagePrefetchCount,
PriorityLevel = _priorityLevel,
ReadCompacted = _readCompacted,
+ ReplicateSubscriptionState = _replicateSubscriptionState,
StateChangedHandler = _stateChangedHandler,
SubscriptionProperties = _subscriptionProperties,
SubscriptionType = _subscriptionType
diff --git a/src/DotPulsar/PulsarClient.cs b/src/DotPulsar/PulsarClient.cs
index 5a0d8e1..66284ef 100644
--- a/src/DotPulsar/PulsarClient.cs
+++ b/src/DotPulsar/PulsarClient.cs
@@ -97,6 +97,7 @@ public sealed class PulsarClient : IPulsarClient
InitialPosition = (CommandSubscribe.InitialPositionType) options.InitialPosition,
PriorityLevel = options.PriorityLevel,
ReadCompacted = options.ReadCompacted,
+ ReplicateSubscriptionState = options.ReplicateSubscriptionState,
Subscription = options.SubscriptionName,
Topic = options.Topic,
Type = (CommandSubscribe.SubType) options.SubscriptionType