You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by yu...@apache.org on 2024/01/02 14:20:36 UTC

(pulsar) branch master updated: [improve][broker] PIP-299-part-2: add config dispatcherPauseOnAckStatePersistentEnabled (#21370)

This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c0b89eb7481 [improve][broker] PIP-299-part-2: add config dispatcherPauseOnAckStatePersistentEnabled (#21370)
c0b89eb7481 is described below

commit c0b89eb74811cb9e2451d6dfdda1efc2e4bb45c6
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Tue Jan 2 22:20:29 2024 +0800

    [improve][broker] PIP-299-part-2: add config dispatcherPauseOnAckStatePersistentEnabled (#21370)
    
    The part 2 of PIP-299: add config dispatcherPauseOnAckStatePersistentEnabled
---
 conf/broker.conf                                                  | 4 ++++
 conf/standalone.conf                                              | 4 ++++
 .../main/java/org/apache/pulsar/broker/ServiceConfiguration.java  | 7 +++++++
 .../java/org/apache/pulsar/broker/service/StandaloneTest.java     | 8 +++++---
 .../org/apache/pulsar/common/naming/ServiceConfigurationTest.java | 5 +++++
 .../src/test/resources/configurations/pulsar_broker_test.conf     | 1 +
 .../resources/configurations/pulsar_broker_test_standalone.conf   | 1 +
 7 files changed, 27 insertions(+), 3 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 82dd5640740..e4494be0666 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -1822,6 +1822,10 @@ subscriptionKeySharedEnable=true
 # Deprecated: use managedLedgerMaxUnackedRangesToPersistInMetadataStore
 managedLedgerMaxUnackedRangesToPersistInZooKeeper=-1
 
+# After enabling this feature, Pulsar will stop delivery messages to clients if the cursor metadata is too large to
+# persist, it will help to reduce the duplicates caused by the ack state that can not be fully persistent.
+dispatcherPauseOnAckStatePersistentEnabled=false
+
 # If enabled, the maximum "acknowledgment holes" will not be limited and "acknowledgment holes" are stored in
 # multiple entries.
 persistentUnackedRangesWithMultipleEntriesEnabled=false
diff --git a/conf/standalone.conf b/conf/standalone.conf
index cf13f12c8fe..a916a2f477e 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -1237,6 +1237,10 @@ configurationStoreServers=
 # Deprecated: use managedLedgerMaxUnackedRangesToPersistInMetadataStore
 managedLedgerMaxUnackedRangesToPersistInZooKeeper=-1
 
+# After enabling this feature, Pulsar will stop delivery messages to clients if the cursor metadata is too large to
+# persist, it will help to reduce the duplicates caused by the ack state that can not be fully persistent.
+dispatcherPauseOnAckStatePersistentEnabled=false
+
 # Whether to enable the delayed delivery for messages.
 # If disabled, messages will be immediately delivered and there will
 # be no tracking overhead.
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 4f2d56fc07e..cfb66a7df78 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -2096,6 +2096,13 @@ public class ServiceConfiguration implements PulsarConfiguration {
             doc = "Use Open Range-Set to cache unacked messages (it is memory efficient but it can take more cpu)"
         )
     private boolean managedLedgerUnackedRangesOpenCacheSetEnabled = true;
+    @FieldContext(
+        dynamic = true,
+        category = CATEGORY_STORAGE_ML,
+        doc = "After enabling this feature, Pulsar will stop delivery messages to clients if the cursor metadata is"
+            + " too large to persist, it will help to reduce the duplicates caused by the ack state that can not be"
+            + " fully persistent. Default false.")
+    private boolean dispatcherPauseOnAckStatePersistentEnabled = false;
     @FieldContext(
         dynamic = true,
         category = CATEGORY_STORAGE_ML,
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java
index 5100826530c..b99f8d5338f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java
@@ -18,11 +18,12 @@
  */
 package org.apache.pulsar.broker.service;
 
+import org.apache.pulsar.PulsarStandaloneStarter;
+import org.testng.annotations.Test;
+
 import static org.testng.AssertJUnit.assertEquals;
 import static org.testng.AssertJUnit.assertNotNull;
 import static org.testng.AssertJUnit.assertNull;
-import org.apache.pulsar.PulsarStandaloneStarter;
-import org.testng.annotations.Test;
 
 @Test(groups = "broker")
 public class StandaloneTest {
@@ -54,12 +55,13 @@ public class StandaloneTest {
     }
 
     @Test
-    public void testAdvertised() throws Exception {
+    public void testInitialize() throws Exception {
         String[] args = new String[]{"--config",
                 "./src/test/resources/configurations/pulsar_broker_test_standalone.conf"};
         PulsarStandaloneStarter standalone = new TestPulsarStandaloneStarter(args);
         assertNull(standalone.getConfig().getAdvertisedAddress());
         assertEquals(standalone.getConfig().getAdvertisedListeners(),
                 "internal:pulsar://192.168.1.11:6660,internal:pulsar+ssl://192.168.1.11:6651");
+        assertEquals(standalone.getConfig().isDispatcherPauseOnAckStatePersistentEnabled(), true);
     }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
index 55971c15adf..ebeaffc48e4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
@@ -73,6 +73,7 @@ public class ServiceConfigurationTest {
         assertEquals(config.getManagedLedgerDataReadPriority(), "bookkeeper-first");
         assertEquals(config.getBacklogQuotaDefaultLimitGB(), 0.05);
         assertEquals(config.getHttpMaxRequestHeaderSize(), 1234);
+        assertEquals(config.isDispatcherPauseOnAckStatePersistentEnabled(), true);
         OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.create(config.getProperties());
         assertEquals(offloadPolicies.getManagedLedgerOffloadedReadPriority().getValue(), "bookkeeper-first");
     }
@@ -289,6 +290,7 @@ public class ServiceConfigurationTest {
             assertEquals(configuration.getTransactionPendingAckBatchedWriteMaxRecords(), 512);
             assertEquals(configuration.getTransactionPendingAckBatchedWriteMaxSize(), 1024 * 1024 * 4);
             assertEquals(configuration.getTransactionPendingAckBatchedWriteMaxDelayInMillis(), 1);
+            assertEquals(configuration.isDispatcherPauseOnAckStatePersistentEnabled(), false);
         }
         // pulsar_broker_test.conf.
         try (InputStream inputStream = this.getClass().getClassLoader().getResourceAsStream(fileName)) {
@@ -301,6 +303,7 @@ public class ServiceConfigurationTest {
             assertEquals(configuration.getTransactionPendingAckBatchedWriteMaxRecords(), 44);
             assertEquals(configuration.getTransactionPendingAckBatchedWriteMaxSize(), 55);
             assertEquals(configuration.getTransactionPendingAckBatchedWriteMaxDelayInMillis(), 66);
+            assertEquals(configuration.isDispatcherPauseOnAckStatePersistentEnabled(), true);
         }
         // string input stream.
         StringBuilder stringBuilder = new StringBuilder();
@@ -312,6 +315,7 @@ public class ServiceConfigurationTest {
         stringBuilder.append("transactionPendingAckBatchedWriteMaxRecords=521").append(System.lineSeparator());
         stringBuilder.append("transactionPendingAckBatchedWriteMaxSize=1025").append(System.lineSeparator());
         stringBuilder.append("transactionPendingAckBatchedWriteMaxDelayInMillis=20").append(System.lineSeparator());
+        stringBuilder.append("dispatcherPauseOnAckStatePersistentEnabled=true").append(System.lineSeparator());
         try(ByteArrayInputStream inputStream =
                     new ByteArrayInputStream(stringBuilder.toString().getBytes(StandardCharsets.UTF_8))){
             configuration = PulsarConfigurationLoader.create(inputStream, ServiceConfiguration.class);
@@ -323,6 +327,7 @@ public class ServiceConfigurationTest {
             assertEquals(configuration.getTransactionPendingAckBatchedWriteMaxRecords(), 521);
             assertEquals(configuration.getTransactionPendingAckBatchedWriteMaxSize(), 1025);
             assertEquals(configuration.getTransactionPendingAckBatchedWriteMaxDelayInMillis(), 20);
+            assertEquals(configuration.isDispatcherPauseOnAckStatePersistentEnabled(), true);
         }
     }
 
diff --git a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf
index bfbbfb7487c..f2316111f80 100644
--- a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf
+++ b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf
@@ -93,6 +93,7 @@ brokerDeleteInactiveTopicsMode=delete_when_subscriptions_caught_up
 supportedNamespaceBundleSplitAlgorithms=[range_equally_divide]
 defaultNamespaceBundleSplitAlgorithm=topic_count_equally_divide
 maxMessagePublishBufferSizeInMB=-1
+dispatcherPauseOnAckStatePersistentEnabled=true
 
 ### --- Transaction config variables --- ###
 transactionLogBatchedWriteEnabled=true
diff --git a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf
index c733409fc00..4a40d9f0c65 100644
--- a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf
+++ b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf
@@ -94,3 +94,4 @@ brokerDeleteInactiveTopicsMode=delete_when_subscriptions_caught_up
 supportedNamespaceBundleSplitAlgorithms=[range_equally_divide]
 defaultNamespaceBundleSplitAlgorithm=topic_count_equally_divide
 maxMessagePublishBufferSizeInMB=-1
+dispatcherPauseOnAckStatePersistentEnabled=true