You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by yu...@apache.org on 2024/01/02 14:20:36 UTC
(pulsar) branch master updated: [improve][broker] PIP-299-part-2: add config dispatcherPauseOnAckStatePersistentEnabled (#21370)
This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c0b89eb7481 [improve][broker] PIP-299-part-2: add config dispatcherPauseOnAckStatePersistentEnabled (#21370)
c0b89eb7481 is described below
commit c0b89eb74811cb9e2451d6dfdda1efc2e4bb45c6
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Tue Jan 2 22:20:29 2024 +0800
[improve][broker] PIP-299-part-2: add config dispatcherPauseOnAckStatePersistentEnabled (#21370)
The part 2 of PIP-299: add config dispatcherPauseOnAckStatePersistentEnabled
---
conf/broker.conf | 4 ++++
conf/standalone.conf | 4 ++++
.../main/java/org/apache/pulsar/broker/ServiceConfiguration.java | 7 +++++++
.../java/org/apache/pulsar/broker/service/StandaloneTest.java | 8 +++++---
.../org/apache/pulsar/common/naming/ServiceConfigurationTest.java | 5 +++++
.../src/test/resources/configurations/pulsar_broker_test.conf | 1 +
.../resources/configurations/pulsar_broker_test_standalone.conf | 1 +
7 files changed, 27 insertions(+), 3 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index 82dd5640740..e4494be0666 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -1822,6 +1822,10 @@ subscriptionKeySharedEnable=true
# Deprecated: use managedLedgerMaxUnackedRangesToPersistInMetadataStore
managedLedgerMaxUnackedRangesToPersistInZooKeeper=-1
+# After enabling this feature, Pulsar will stop delivery messages to clients if the cursor metadata is too large to
+# persist, it will help to reduce the duplicates caused by the ack state that can not be fully persistent.
+dispatcherPauseOnAckStatePersistentEnabled=false
+
# If enabled, the maximum "acknowledgment holes" will not be limited and "acknowledgment holes" are stored in
# multiple entries.
persistentUnackedRangesWithMultipleEntriesEnabled=false
diff --git a/conf/standalone.conf b/conf/standalone.conf
index cf13f12c8fe..a916a2f477e 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -1237,6 +1237,10 @@ configurationStoreServers=
# Deprecated: use managedLedgerMaxUnackedRangesToPersistInMetadataStore
managedLedgerMaxUnackedRangesToPersistInZooKeeper=-1
+# After enabling this feature, Pulsar will stop delivery messages to clients if the cursor metadata is too large to
+# persist, it will help to reduce the duplicates caused by the ack state that can not be fully persistent.
+dispatcherPauseOnAckStatePersistentEnabled=false
+
# Whether to enable the delayed delivery for messages.
# If disabled, messages will be immediately delivered and there will
# be no tracking overhead.
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 4f2d56fc07e..cfb66a7df78 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -2096,6 +2096,13 @@ public class ServiceConfiguration implements PulsarConfiguration {
doc = "Use Open Range-Set to cache unacked messages (it is memory efficient but it can take more cpu)"
)
private boolean managedLedgerUnackedRangesOpenCacheSetEnabled = true;
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_STORAGE_ML,
+ doc = "After enabling this feature, Pulsar will stop delivery messages to clients if the cursor metadata is"
+ + " too large to persist, it will help to reduce the duplicates caused by the ack state that can not be"
+ + " fully persistent. Default false.")
+ private boolean dispatcherPauseOnAckStatePersistentEnabled = false;
@FieldContext(
dynamic = true,
category = CATEGORY_STORAGE_ML,
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java
index 5100826530c..b99f8d5338f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java
@@ -18,11 +18,12 @@
*/
package org.apache.pulsar.broker.service;
+import org.apache.pulsar.PulsarStandaloneStarter;
+import org.testng.annotations.Test;
+
import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertNotNull;
import static org.testng.AssertJUnit.assertNull;
-import org.apache.pulsar.PulsarStandaloneStarter;
-import org.testng.annotations.Test;
@Test(groups = "broker")
public class StandaloneTest {
@@ -54,12 +55,13 @@ public class StandaloneTest {
}
@Test
- public void testAdvertised() throws Exception {
+ public void testInitialize() throws Exception {
String[] args = new String[]{"--config",
"./src/test/resources/configurations/pulsar_broker_test_standalone.conf"};
PulsarStandaloneStarter standalone = new TestPulsarStandaloneStarter(args);
assertNull(standalone.getConfig().getAdvertisedAddress());
assertEquals(standalone.getConfig().getAdvertisedListeners(),
"internal:pulsar://192.168.1.11:6660,internal:pulsar+ssl://192.168.1.11:6651");
+ assertEquals(standalone.getConfig().isDispatcherPauseOnAckStatePersistentEnabled(), true);
}
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
index 55971c15adf..ebeaffc48e4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
@@ -73,6 +73,7 @@ public class ServiceConfigurationTest {
assertEquals(config.getManagedLedgerDataReadPriority(), "bookkeeper-first");
assertEquals(config.getBacklogQuotaDefaultLimitGB(), 0.05);
assertEquals(config.getHttpMaxRequestHeaderSize(), 1234);
+ assertEquals(config.isDispatcherPauseOnAckStatePersistentEnabled(), true);
OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.create(config.getProperties());
assertEquals(offloadPolicies.getManagedLedgerOffloadedReadPriority().getValue(), "bookkeeper-first");
}
@@ -289,6 +290,7 @@ public class ServiceConfigurationTest {
assertEquals(configuration.getTransactionPendingAckBatchedWriteMaxRecords(), 512);
assertEquals(configuration.getTransactionPendingAckBatchedWriteMaxSize(), 1024 * 1024 * 4);
assertEquals(configuration.getTransactionPendingAckBatchedWriteMaxDelayInMillis(), 1);
+ assertEquals(configuration.isDispatcherPauseOnAckStatePersistentEnabled(), false);
}
// pulsar_broker_test.conf.
try (InputStream inputStream = this.getClass().getClassLoader().getResourceAsStream(fileName)) {
@@ -301,6 +303,7 @@ public class ServiceConfigurationTest {
assertEquals(configuration.getTransactionPendingAckBatchedWriteMaxRecords(), 44);
assertEquals(configuration.getTransactionPendingAckBatchedWriteMaxSize(), 55);
assertEquals(configuration.getTransactionPendingAckBatchedWriteMaxDelayInMillis(), 66);
+ assertEquals(configuration.isDispatcherPauseOnAckStatePersistentEnabled(), true);
}
// string input stream.
StringBuilder stringBuilder = new StringBuilder();
@@ -312,6 +315,7 @@ public class ServiceConfigurationTest {
stringBuilder.append("transactionPendingAckBatchedWriteMaxRecords=521").append(System.lineSeparator());
stringBuilder.append("transactionPendingAckBatchedWriteMaxSize=1025").append(System.lineSeparator());
stringBuilder.append("transactionPendingAckBatchedWriteMaxDelayInMillis=20").append(System.lineSeparator());
+ stringBuilder.append("dispatcherPauseOnAckStatePersistentEnabled=true").append(System.lineSeparator());
try(ByteArrayInputStream inputStream =
new ByteArrayInputStream(stringBuilder.toString().getBytes(StandardCharsets.UTF_8))){
configuration = PulsarConfigurationLoader.create(inputStream, ServiceConfiguration.class);
@@ -323,6 +327,7 @@ public class ServiceConfigurationTest {
assertEquals(configuration.getTransactionPendingAckBatchedWriteMaxRecords(), 521);
assertEquals(configuration.getTransactionPendingAckBatchedWriteMaxSize(), 1025);
assertEquals(configuration.getTransactionPendingAckBatchedWriteMaxDelayInMillis(), 20);
+ assertEquals(configuration.isDispatcherPauseOnAckStatePersistentEnabled(), true);
}
}
diff --git a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf
index bfbbfb7487c..f2316111f80 100644
--- a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf
+++ b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf
@@ -93,6 +93,7 @@ brokerDeleteInactiveTopicsMode=delete_when_subscriptions_caught_up
supportedNamespaceBundleSplitAlgorithms=[range_equally_divide]
defaultNamespaceBundleSplitAlgorithm=topic_count_equally_divide
maxMessagePublishBufferSizeInMB=-1
+dispatcherPauseOnAckStatePersistentEnabled=true
### --- Transaction config variables --- ###
transactionLogBatchedWriteEnabled=true
diff --git a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf
index c733409fc00..4a40d9f0c65 100644
--- a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf
+++ b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf
@@ -94,3 +94,4 @@ brokerDeleteInactiveTopicsMode=delete_when_subscriptions_caught_up
supportedNamespaceBundleSplitAlgorithms=[range_equally_divide]
defaultNamespaceBundleSplitAlgorithm=topic_count_equally_divide
maxMessagePublishBufferSizeInMB=-1
+dispatcherPauseOnAckStatePersistentEnabled=true