You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xi...@apache.org on 2022/12/08 07:10:39 UTC

[pulsar] 01/02: [fix][broker]unify time unit at dropping the backlog on a topic (#17957)

This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 49e86d921c2ebbcaf0a93d3c11b2df666963c77d
Author: Qiang Huang <HQ...@users.noreply.github.com>
AuthorDate: Mon Oct 10 11:04:08 2022 +0800

    [fix][broker]unify time unit at dropping the backlog on a topic (#17957)
    
    (cherry picked from commit add77aa23a1cd8435b0a68687b461494ab1f6d26)
---
 .../pulsar/broker/service/BacklogQuotaManager.java |  2 +-
 .../broker/service/BacklogQuotaManagerTest.java    | 54 ++++++++++++++++++++++
 2 files changed, 55 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
index 915f9e7c6b9..2744469ea8d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
@@ -226,7 +226,7 @@ public class BacklogQuotaManager {
                     }
                     // Timestamp only > 0 if ledger has been closed
                     if (ledgerInfo.getTimestamp() > 0
-                            && currentMillis - ledgerInfo.getTimestamp() > quota.getLimitTime()) {
+                            && currentMillis - ledgerInfo.getTimestamp() > quota.getLimitTime() * 1000) {
                         // skip whole ledger for the slowest cursor
                         PositionImpl nextPosition =
                                 PositionImpl.get(mLedger.getNextValidLedger(ledgerInfo.getLedgerId()), -1);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
index 6519632f948..69c0b8dd00e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
@@ -521,6 +521,60 @@ public class BacklogQuotaManagerTest {
         client.close();
     }
 
+    @Test(timeOut = 60000)
+    public void testConsumerBacklogEvictionTimeQuotaWithPartEviction() throws Exception {
+        assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"),
+                new HashMap<>());
+        admin.namespaces().setBacklogQuota("prop/ns-quota",
+                BacklogQuota.builder()
+                        .limitTime(5) // set limit time as 5 seconds
+                        .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction)
+                        .build(), BacklogQuota.BacklogQuotaType.message_age);
+        PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, TimeUnit.SECONDS)
+                .build();
+
+        final String topic1 = "persistent://prop/ns-quota/topic3" + UUID.randomUUID();
+        final String subName1 = "c1";
+        final String subName2 = "c2";
+        int numMsgs = 5;
+
+        Consumer<byte[]> consumer1 = client.newConsumer().topic(topic1).subscriptionName(subName1).subscribe();
+        Consumer<byte[]> consumer2 = client.newConsumer().topic(topic1).subscriptionName(subName2).subscribe();
+        org.apache.pulsar.client.api.Producer<byte[]> producer = createProducer(client, topic1);
+        byte[] content = new byte[1024];
+        for (int i = 0; i < numMsgs; i++) {
+            producer.send(content);
+            consumer1.receive();
+            consumer2.receive();
+        }
+
+        TopicStats stats = getTopicStats(topic1);
+        assertEquals(stats.getSubscriptions().get(subName1).getMsgBacklog(), 5);
+        assertEquals(stats.getSubscriptions().get(subName2).getMsgBacklog(), 5);
+
+        // Sleep 5000 mills for first 5 messages.
+        Thread.sleep(5000L);
+        numMsgs = 9;
+        for (int i = 0; i < numMsgs; i++) {
+            producer.send(content);
+            consumer1.receive();
+            consumer2.receive();
+        }
+
+        // The first 5 messages are expired after sleeping 2000 more mills.
+        Thread.sleep(2000L);
+        rolloverStats();
+
+        TopicStats stats2 = getTopicStats(topic1);
+        // The first 5 messages should be expired due to limit time is 5 seconds, and the last 9 message should not.
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(stats2.getSubscriptions().get(subName1).getMsgBacklog(), 9);
+            assertEquals(stats2.getSubscriptions().get(subName2).getMsgBacklog(), 9);
+        });
+        client.close();
+    }
+
+
     @Test
     public void testConsumerBacklogEvictionTimeQuotaWithEmptyLedger() throws Exception {
         assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"),