You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rd...@apache.org on 2021/08/17 06:24:47 UTC

[pulsar] branch master updated: [server] Allow broker to start with default backlogquota in bytes (#11671)

This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 80171a7  [server] Allow broker to start with default backlogquota in bytes (#11671)
80171a7 is described below

commit 80171a733ab4799f912a8935f03c19554b9ca3b1
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Mon Aug 16 23:24:02 2021 -0700

    [server] Allow broker to start with default backlogquota in bytes (#11671)
    
    * [server] Allow broker to start with default backlogquota in bytes
    
    * remove sysout from test
    
    * fix test
---
 conf/broker.conf                                   |  5 ++-
 .../apache/pulsar/broker/ServiceConfiguration.java |  9 ++++-
 .../pulsar/broker/service/BacklogQuotaManager.java |  5 ++-
 .../org/apache/pulsar/broker/ConfigHelper.java     |  5 ++-
 .../broker/service/BacklogQuotaManagerTest.java    | 46 ++++++++++++++++++++++
 .../common/naming/ServiceConfigurationTest.java    |  8 ++--
 .../policies/data/impl/BacklogQuotaImpl.java       |  2 +-
 site2/docs/reference-configuration.md              |  4 +-
 8 files changed, 73 insertions(+), 11 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index e4c15c7..404d8d7 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -125,9 +125,12 @@ backlogQuotaCheckEnabled=true
 # How often to check for topics that have reached the quota
 backlogQuotaCheckIntervalInSeconds=60
 
-# Default per-topic backlog quota limit, less than 0 means no limitation. default is -1.
+# Deprecated - Use backlogQuotaDefaultLimitByte instead.
 backlogQuotaDefaultLimitGB=-1
 
+# Default per-topic backlog quota limit, less than 0 means no limitation. default is -1.
+backlogQuotaDefaultLimitBytes=-1
+
 # Default per-topic backlog quota time limit in second, less than 0 means no limitation. default is -1.
 backlogQuotaDefaultLimitSecond=-1
 
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index aaec952..e98a5ce 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -334,12 +334,19 @@ public class ServiceConfiguration implements PulsarConfiguration {
     )
     private int backlogQuotaCheckIntervalInSeconds = 60;
 
+    @Deprecated
+    @FieldContext(
+        category = CATEGORY_POLICIES,
+        doc = "@deprecated - Use backlogQuotaDefaultLimitByte instead.\""
+    )
+    private double backlogQuotaDefaultLimitGB = -1;
+
     @FieldContext(
         category = CATEGORY_POLICIES,
         doc = "Default per-topic backlog quota limit by size, less than 0 means no limitation. default is -1."
                 + " Increase it if you want to allow larger msg backlog"
     )
-    private long backlogQuotaDefaultLimitGB = -1;
+    private long backlogQuotaDefaultLimitBytes = -1;
 
     @FieldContext(
             category = CATEGORY_POLICIES,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
index bbb3ddf..f9ccb74 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
@@ -53,9 +53,10 @@ public class BacklogQuotaManager {
 
     public BacklogQuotaManager(PulsarService pulsar) {
         this.isTopicLevelPoliciesEnable = pulsar.getConfiguration().isTopicLevelPoliciesEnabled();
+        double backlogQuotaGB = pulsar.getConfiguration().getBacklogQuotaDefaultLimitGB();
         this.defaultQuota = BacklogQuotaImpl.builder()
-                .limitSize(pulsar.getConfiguration().getBacklogQuotaDefaultLimitGB()
-                        * BacklogQuotaImpl.BYTES_IN_GIGABYTE)
+                .limitSize(backlogQuotaGB > 0 ? (long) backlogQuotaGB * BacklogQuotaImpl.BYTES_IN_GIGABYTE
+                        : pulsar.getConfiguration().getBacklogQuotaDefaultLimitBytes())
                 .limitTime(pulsar.getConfiguration().getBacklogQuotaDefaultLimitSecond())
                 .retentionPolicy(pulsar.getConfiguration().getBacklogQuotaDefaultRetentionPolicy())
                 .build();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/ConfigHelper.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/ConfigHelper.java
index b929636..ca8231a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/ConfigHelper.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/ConfigHelper.java
@@ -38,8 +38,11 @@ public class ConfigHelper {
     }
 
     public static BacklogQuota sizeBacklogQuota(ServiceConfiguration configuration) {
+        long backlogQuotaBytes = configuration.getBacklogQuotaDefaultLimitGB() > 0
+                ? ((long) configuration.getBacklogQuotaDefaultLimitGB() * BacklogQuotaImpl.BYTES_IN_GIGABYTE)
+                : configuration.getBacklogQuotaDefaultLimitBytes();
         return BacklogQuota.builder()
-                .limitSize(configuration.getBacklogQuotaDefaultLimitGB() * BacklogQuotaImpl.BYTES_IN_GIGABYTE)
+                .limitSize(backlogQuotaBytes)
                 .retentionPolicy(configuration.getBacklogQuotaDefaultRetentionPolicy())
                 .build();
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
index 560cd9e..0dac0c2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
@@ -51,12 +51,14 @@ import org.apache.pulsar.common.policies.data.ClusterDataImpl;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl;
 import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
 import org.awaitility.Awaitility;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker")
@@ -72,6 +74,11 @@ public class BacklogQuotaManagerTest {
     private static final int TIME_TO_CHECK_BACKLOG_QUOTA = 3;
     private static final int MAX_ENTRIES_PER_LEDGER = 5;
 
+    @DataProvider(name = "backlogQuotaSizeGB")
+    public Object[][] backlogQuotaSizeGB() {
+        return new Object[][] { { true }, { false } };
+    }
+
     @BeforeMethod
     void setup() throws Exception {
         try {
@@ -1211,5 +1218,44 @@ public class BacklogQuotaManagerTest {
         client.close();
     }
 
+    @Test(dataProvider = "backlogQuotaSizeGB")
+    public void testBacklogQuotaInGB(boolean backlogQuotaSizeGB) throws Exception {
+
+        pulsar.close();
+        long backlogQuotaByte = 10 * 1024;
+        if (backlogQuotaSizeGB) {
+            config.setBacklogQuotaDefaultLimitGB(((double) backlogQuotaByte) / BacklogQuotaImpl.BYTES_IN_GIGABYTE);
+        } else {
+            config.setBacklogQuotaDefaultLimitBytes(backlogQuotaByte);
+        }
+        config.setBacklogQuotaDefaultRetentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction);
+        pulsar = new PulsarService(config);
+        pulsar.start();
+
+        @Cleanup
+        PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).statsInterval(0, TimeUnit.SECONDS)
+                .build();
+
+        final String topic1 = "persistent://prop/ns-quota/topic2";
+        final String subName1 = "c1";
+        final String subName2 = "c2";
+        final int numMsgs = 20;
+
+        Consumer<byte[]> consumer1 = client.newConsumer().topic(topic1).subscriptionName(subName1).subscribe();
+        Consumer<byte[]> consumer2 = client.newConsumer().topic(topic1).subscriptionName(subName2).subscribe();
+        org.apache.pulsar.client.api.Producer<byte[]> producer = createProducer(client, topic1);
+        byte[] content = new byte[1024];
+        for (int i = 0; i < numMsgs; i++) {
+            producer.send(content);
+            consumer1.receive();
+            consumer2.receive();
+        }
+
+        Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000);
+        rolloverStats();
+
+        TopicStats stats = admin.topics().getStats(topic1);
+        assertTrue(stats.getBacklogSize() < 10 * 1024, "Storage size is [" + stats.getStorageSize() + "]");
+    }
     private static final Logger LOG = LoggerFactory.getLogger(BacklogQuotaManagerTest.class);
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
index 6b7df2c..452ee96 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
@@ -49,7 +49,7 @@ public class ServiceConfigurationTest {
     public void testInit() throws Exception {
         final String zookeeperServer = "localhost:2184";
         final int brokerServicePort = 1000;
-        InputStream newStream = updateProp(zookeeperServer, String.valueOf(brokerServicePort), "ns1,ns2");
+        InputStream newStream = updateProp(zookeeperServer, String.valueOf(brokerServicePort), "ns1,ns2", 0.05);
         final ServiceConfiguration config = PulsarConfigurationLoader.create(newStream, ServiceConfiguration.class);
         assertTrue(isNotBlank(config.getZookeeperServers()));
         assertTrue(config.getBrokerServicePort().isPresent()
@@ -60,6 +60,7 @@ public class ServiceConfigurationTest {
         assertEquals(config.getSupportedNamespaceBundleSplitAlgorithms().size(), 1);
         assertEquals(config.getMaxMessagePublishBufferSizeInMB(), -1);
         assertEquals(config.getManagedLedgerDataReadPriority(), "bookkeeper-first");
+        assertEquals(config.getBacklogQuotaDefaultLimitGB(), 0.05);
     }
 
     @Test
@@ -86,11 +87,11 @@ public class ServiceConfigurationTest {
     @Test(expectedExceptions = IllegalArgumentException.class)
     public void testInitFailure() throws Exception {
         final String zookeeperServer = "localhost:2184";
-        InputStream newStream = updateProp(zookeeperServer, "invalid-string", null);
+        InputStream newStream = updateProp(zookeeperServer, "invalid-string", null, 0.005);
         PulsarConfigurationLoader.create(newStream, ServiceConfiguration.class);
     }
 
-    private InputStream updateProp(String zookeeperServer, String brokerServicePort, String namespace)
+    private InputStream updateProp(String zookeeperServer, String brokerServicePort, String namespace, double backlogQuotaGB)
             throws IOException {
         checkNotNull(fileName);
         Properties properties = new Properties();
@@ -98,6 +99,7 @@ public class ServiceConfigurationTest {
         properties.load(stream);
         properties.setProperty("zookeeperServers", zookeeperServer);
         properties.setProperty("brokerServicePort", brokerServicePort);
+        properties.setProperty("backlogQuotaDefaultLimitGB", "" + backlogQuotaGB);
         if (namespace != null)
             properties.setProperty("bootstrapNamespaces", namespace);
         StringWriter writer = new StringWriter();
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/BacklogQuotaImpl.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/BacklogQuotaImpl.java
index 79e0af2..591e8b8 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/BacklogQuotaImpl.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/BacklogQuotaImpl.java
@@ -40,7 +40,7 @@ public class BacklogQuotaImpl implements BacklogQuota {
     }
 
     public static class BacklogQuotaImplBuilder implements BacklogQuota.Builder {
-        private long limitSize = -1 * BYTES_IN_GIGABYTE;
+        private long limitSize = -1;
         private int limitTime = -1;
         private RetentionPolicy retentionPolicy;
 
diff --git a/site2/docs/reference-configuration.md b/site2/docs/reference-configuration.md
index ae62e27..a96c328 100644
--- a/site2/docs/reference-configuration.md
+++ b/site2/docs/reference-configuration.md
@@ -175,7 +175,7 @@ Pulsar brokers are responsible for handling incoming messages from producers, di
 |skipBrokerShutdownOnOOM| Flag to skip broker shutdown when broker handles Out of memory error. |false|
 |backlogQuotaCheckEnabled|  Enable backlog quota check. Enforces action on topic when the quota is reached  |true|
 |backlogQuotaCheckIntervalInSeconds|  How often to check for topics that have reached the quota |60|
-|backlogQuotaDefaultLimitGB| The default per-topic backlog quota limit. Being less than 0 means no limitation. By default, it is -1. | -1 |
+|backlogQuotaDefaultLimitBytes| The default per-topic backlog quota limit. Being less than 0 means no limitation. By default, it is -1. | -1 |
 |backlogQuotaDefaultRetentionPolicy|The defaulted backlog quota retention policy. By Default, it is `producer_request_hold`. <li>'producer_request_hold' Policy which holds producer's send request until the resource becomes available (or holding times out)</li> <li>'producer_exception' Policy which throws `javax.jms.ResourceAllocationException` to the producer </li><li>'consumer_backlog_eviction' Policy which evicts the oldest message from the slowest consumer's backlog</li>|producer_requ [...]
 |allowAutoTopicCreation| Enable topic auto creation if a new producer or consumer connected |true|
 |allowAutoTopicCreationType| The type of topic that is allowed to be automatically created.(partitioned/non-partitioned) |non-partitioned|
@@ -464,7 +464,7 @@ You can set the log level and configuration in the  [log4j2.yaml](https://github
 |skipBrokerShutdownOnOOM| Flag to skip broker shutdown when broker handles Out of memory error. |false|
 |backlogQuotaCheckEnabled|  Enable the backlog quota check, which enforces a specified action when the quota is reached.  |true|
 |backlogQuotaCheckIntervalInSeconds|  How often to check for topics that have reached the backlog quota.  |60|
-|backlogQuotaDefaultLimitGB| The default per-topic backlog quota limit. Being less than 0 means no limitation. By default, it is -1. |-1|
+|backlogQuotaDefaultLimitBytes| The default per-topic backlog quota limit. Being less than 0 means no limitation. By default, it is -1. |-1|
 |ttlDurationDefaultInSeconds|The default Time to Live (TTL) for namespaces if the TTL is not configured at namespace policies. When the value is set to `0`, TTL is disabled. By default, TTL is disabled. |0|
 |brokerDeleteInactiveTopicsEnabled| Enable the deletion of inactive topics. If topics are not consumed for some while, these inactive topics might be cleaned up. Deleting inactive topics is enabled by default. The default period is 1 minute. |true|
 |brokerDeleteInactiveTopicsFrequencySeconds|  How often to check for inactive topics, in seconds. |60|