You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/09/19 05:29:38 UTC

[pulsar] 01/03: [fix][broker] Fix namespace backlog quota check with retention. (#17706)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 8b2694397ca8a26663c726402e4fa1ee01a076a0
Author: JiangHaiting <ji...@apache.org>
AuthorDate: Mon Sep 19 10:29:51 2022 +0800

    [fix][broker] Fix namespace backlog quota check with retention. (#17706)
---
 .../apache/pulsar/broker/admin/AdminResource.java  | 10 +++++--
 .../apache/pulsar/broker/admin/AdminApiTest.java   | 32 ++++++++++++++++++++++
 2 files changed, 39 insertions(+), 3 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 432611a69c0..c9078c63de7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -355,17 +355,21 @@ public abstract class AdminResource extends PulsarWebResource {
     }
 
     protected boolean checkBacklogQuota(BacklogQuota quota, RetentionPolicies retention) {
-        if (retention == null || retention.getRetentionSizeInMB() <= 0 || retention.getRetentionTimeInMinutes() <= 0) {
+        if (retention == null
+                || (retention.getRetentionSizeInMB() <= 0 && retention.getRetentionTimeInMinutes() <= 0)) {
             return true;
         }
         if (quota == null) {
             quota = pulsar().getBrokerService().getBacklogQuotaManager().getDefaultQuota();
         }
-        if (quota.getLimitSize() >= (retention.getRetentionSizeInMB() * 1024 * 1024)) {
+
+        if (retention.getRetentionSizeInMB() > 0
+                && quota.getLimitSize() >= (retention.getRetentionSizeInMB() * 1024 * 1024)) {
             return false;
         }
         // time based quota is in second
-        if (quota.getLimitTime() >= (retention.getRetentionTimeInMinutes() * 60)) {
+        if (retention.getRetentionTimeInMinutes() > 0
+                && quota.getLimitTime() >= retention.getRetentionTimeInMinutes() * 60) {
             return false;
         }
         return true;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 2ec0d76551a..7993750583f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -3397,4 +3397,36 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
         long value2 = partitionedStats.getEarliestMsgPublishTimeInBacklogs();
         Assert.assertNotEquals(value2, 0);
     }
+
+    @Test
+    public void testRetentionAndBacklogQuotaCheck() throws PulsarAdminException {
+        String namespace = "prop-xyz/ns1";
+        //test size check.
+        admin.namespaces().setRetention(namespace, new RetentionPolicies(-1, 10));
+        admin.namespaces().setBacklogQuota(namespace, BacklogQuota.builder().limitSize(9 * 1024 * 1024).build());
+        Assert.expectThrows(PulsarAdminException.PreconditionFailedException.class, () -> {
+            admin.namespaces().setBacklogQuota(namespace, BacklogQuota.builder().limitSize(100 * 1024 * 1024).build());
+        });
+
+        //test time check
+        admin.namespaces().setRetention(namespace, new RetentionPolicies(10, -1));
+        admin.namespaces().setBacklogQuota(namespace, BacklogQuota.builder().limitTime(9 * 60).build());
+        Assert.expectThrows(PulsarAdminException.PreconditionFailedException.class, () -> {
+            admin.namespaces().setBacklogQuota(namespace, BacklogQuota.builder().limitTime(11 * 60).build());
+        });
+
+        // test both size and time.
+        admin.namespaces().setRetention(namespace, new RetentionPolicies(10, 10));
+        admin.namespaces().setBacklogQuota(namespace, BacklogQuota.builder().limitSize(9 * 1024 * 1024).build());
+        admin.namespaces().setBacklogQuota(namespace, BacklogQuota.builder().limitTime(9 * 60).build());
+        admin.namespaces().setBacklogQuota(namespace, BacklogQuota.builder().limitSize(9 * 1024 * 1024).
+                limitTime(9 * 60).build());
+        Assert.expectThrows(PulsarAdminException.PreconditionFailedException.class, () -> {
+            admin.namespaces().setBacklogQuota(namespace, BacklogQuota.builder().limitSize(100 * 1024 * 1024).build());
+        });
+        Assert.expectThrows(PulsarAdminException.PreconditionFailedException.class, () -> {
+            admin.namespaces().setBacklogQuota(namespace, BacklogQuota.builder().limitTime(100 * 60).build());
+        });
+
+    }
 }