You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bl...@apache.org on 2023/03/08 09:20:45 UTC

[pulsar-dotpulsar] branch master updated: 'ReplicateSubscriptionState' can now be set when creating a consumer

This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new fa7817d  'ReplicateSubscriptionState' can now be set when creating a consumer
fa7817d is described below

commit fa7817d6ac1d1bfe13f1884005bd9264afd1dcf1
Author: Daniel Blankensteiner <db...@vmail.dk>
AuthorDate: Wed Mar 8 10:20:36 2023 +0100

    'ReplicateSubscriptionState' can now be set when creating a consumer
---
 CHANGELOG.md                                   |  6 ++++++
 src/DotPulsar/Abstractions/IConsumerBuilder.cs |  5 +++++
 src/DotPulsar/ConsumerOptions.cs               | 11 +++++++++++
 src/DotPulsar/Internal/ConsumerBuilder.cs      |  9 +++++++++
 src/DotPulsar/PulsarClient.cs                  |  1 +
 5 files changed, 32 insertions(+)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 257b35f..8c9f57f 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file.
 
 The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
 
+## [Unreleased]
+
+### Added
+
+- 'ReplicateSubscriptionState' can now be set when creating a consumer. The default is 'false'
+
 ## [2.10.2] - 2023-02-17
 
 ### Fixed
diff --git a/src/DotPulsar/Abstractions/IConsumerBuilder.cs b/src/DotPulsar/Abstractions/IConsumerBuilder.cs
index c4dc94d..d28c0f5 100644
--- a/src/DotPulsar/Abstractions/IConsumerBuilder.cs
+++ b/src/DotPulsar/Abstractions/IConsumerBuilder.cs
@@ -44,6 +44,11 @@ public interface IConsumerBuilder<TMessage>
     /// </summary>
     IConsumerBuilder<TMessage> ReadCompacted(bool readCompacted);
 
+    /// <summary>
+    /// Whether to replicate the subscription's state across clusters (when using geo-replication). The default is 'false'.
+    /// </summary>
+    IConsumerBuilder<TMessage> ReplicateSubscriptionState(bool replicateSubscriptionState);
+
     /// <summary>
     /// Register a state changed handler.
     /// </summary>
diff --git a/src/DotPulsar/ConsumerOptions.cs b/src/DotPulsar/ConsumerOptions.cs
index 3e9d112..977f184 100644
--- a/src/DotPulsar/ConsumerOptions.cs
+++ b/src/DotPulsar/ConsumerOptions.cs
@@ -42,6 +42,11 @@ public sealed class ConsumerOptions<TMessage>
     /// </summary>
     public static readonly bool DefaultReadCompacted = false;
 
+    /// <summary>
+    /// The default of whether to replicate the subscription's state.
+    /// </summary>
+    public static readonly bool DefaultReplicateSubscriptionState = false;
+
     /// <summary>
     /// The default subscription type.
     /// </summary>
@@ -56,6 +61,7 @@ public sealed class ConsumerOptions<TMessage>
         PriorityLevel = DefaultPriorityLevel;
         MessagePrefetchCount = DefaultMessagePrefetchCount;
         ReadCompacted = DefaultReadCompacted;
+        ReplicateSubscriptionState = DefaultReplicateSubscriptionState;
         SubscriptionType = DefaultSubscriptionType;
         SubscriptionProperties = new Dictionary<string, string>();
         SubscriptionName = subscriptionName;
@@ -88,6 +94,11 @@ public sealed class ConsumerOptions<TMessage>
     /// </summary>
     public bool ReadCompacted { get; set; }
 
+    /// <summary>
+    /// Whether to replicate the subscription's state across clusters (when using geo-replication). The default is 'false'.
+    /// </summary>
+    public bool ReplicateSubscriptionState { get; set; }
+
     /// <summary>
     /// Set the schema. This is required.
     /// </summary>
diff --git a/src/DotPulsar/Internal/ConsumerBuilder.cs b/src/DotPulsar/Internal/ConsumerBuilder.cs
index cf85cc1..49267af 100644
--- a/src/DotPulsar/Internal/ConsumerBuilder.cs
+++ b/src/DotPulsar/Internal/ConsumerBuilder.cs
@@ -27,6 +27,7 @@ public sealed class ConsumerBuilder<TMessage> : IConsumerBuilder<TMessage>
     private int _priorityLevel;
     private uint _messagePrefetchCount;
     private bool _readCompacted;
+    private bool _replicateSubscriptionState;
     private string? _subscriptionName;
     private readonly Dictionary<string, string> _subscriptionProperties;
     private SubscriptionType _subscriptionType;
@@ -41,6 +42,7 @@ public sealed class ConsumerBuilder<TMessage> : IConsumerBuilder<TMessage>
         _priorityLevel = ConsumerOptions<TMessage>.DefaultPriorityLevel;
         _messagePrefetchCount = ConsumerOptions<TMessage>.DefaultMessagePrefetchCount;
         _readCompacted = ConsumerOptions<TMessage>.DefaultReadCompacted;
+        _replicateSubscriptionState = ConsumerOptions<TMessage>.DefaultReplicateSubscriptionState;
         _subscriptionProperties = new Dictionary<string, string>();
         _subscriptionType = ConsumerOptions<TMessage>.DefaultSubscriptionType;
     }
@@ -75,6 +77,12 @@ public sealed class ConsumerBuilder<TMessage> : IConsumerBuilder<TMessage>
         return this;
     }
 
+    public IConsumerBuilder<TMessage> ReplicateSubscriptionState(bool replicateSubscriptionState)
+    {
+        _replicateSubscriptionState = replicateSubscriptionState;
+        return this;
+    }
+
     public IConsumerBuilder<TMessage> StateChangedHandler(IHandleStateChanged<ConsumerStateChanged> handler)
     {
         _stateChangedHandler = handler;
@@ -120,6 +128,7 @@ public sealed class ConsumerBuilder<TMessage> : IConsumerBuilder<TMessage>
             MessagePrefetchCount = _messagePrefetchCount,
             PriorityLevel = _priorityLevel,
             ReadCompacted = _readCompacted,
+            ReplicateSubscriptionState = _replicateSubscriptionState,
             StateChangedHandler = _stateChangedHandler,
             SubscriptionProperties = _subscriptionProperties,
             SubscriptionType = _subscriptionType
diff --git a/src/DotPulsar/PulsarClient.cs b/src/DotPulsar/PulsarClient.cs
index 5a0d8e1..66284ef 100644
--- a/src/DotPulsar/PulsarClient.cs
+++ b/src/DotPulsar/PulsarClient.cs
@@ -97,6 +97,7 @@ public sealed class PulsarClient : IPulsarClient
             InitialPosition = (CommandSubscribe.InitialPositionType) options.InitialPosition,
             PriorityLevel = options.PriorityLevel,
             ReadCompacted = options.ReadCompacted,
+            ReplicateSubscriptionState = options.ReplicateSubscriptionState,
             Subscription = options.SubscriptionName,
             Topic = options.Topic,
             Type = (CommandSubscribe.SubType) options.SubscriptionType