You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bo...@apache.org on 2022/12/07 06:59:20 UTC
[pulsar] branch branch-2.9 updated: [fix][broker]unify time unit at dropping the backlog on a topic (#17957)
This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 9d2b51be518 [fix][broker]unify time unit at dropping the backlog on a topic (#17957)
9d2b51be518 is described below
commit 9d2b51be5185fa0df208f5c093306665050fa6f8
Author: Qiang Huang <HQ...@users.noreply.github.com>
AuthorDate: Mon Oct 10 11:04:08 2022 +0800
[fix][broker]unify time unit at dropping the backlog on a topic (#17957)
(cherry picked from commit add77aa23a1cd8435b0a68687b461494ab1f6d26)
---
.../pulsar/broker/service/BacklogQuotaManager.java | 2 +-
.../broker/service/BacklogQuotaManagerTest.java | 54 ++++++++++++++++++++++
2 files changed, 55 insertions(+), 1 deletion(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
index 607e7387fb3..aa8eb981d44 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
@@ -255,7 +255,7 @@ public class BacklogQuotaManager {
}
// Timestamp only > 0 if ledger has been closed
if (ledgerInfo.getTimestamp() > 0
- && currentMillis - ledgerInfo.getTimestamp() > quota.getLimitTime()) {
+ && currentMillis - ledgerInfo.getTimestamp() > quota.getLimitTime() * 1000) {
// skip whole ledger for the slowest cursor
PositionImpl nextPosition =
PositionImpl.get(mLedger.getNextValidLedger(ledgerInfo.getLedgerId()), -1);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
index bedd20df226..7a6de0d7072 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
@@ -522,6 +522,60 @@ public class BacklogQuotaManagerTest {
client.close();
}
+ @Test(timeOut = 60000)
+ public void testConsumerBacklogEvictionTimeQuotaWithPartEviction() throws Exception {
+ assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"),
+ new HashMap<>());
+ admin.namespaces().setBacklogQuota("prop/ns-quota",
+ BacklogQuota.builder()
+ .limitTime(5) // set limit time as 5 seconds
+ .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction)
+ .build(), BacklogQuota.BacklogQuotaType.message_age);
+ PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, TimeUnit.SECONDS)
+ .build();
+
+ final String topic1 = "persistent://prop/ns-quota/topic3" + UUID.randomUUID();
+ final String subName1 = "c1";
+ final String subName2 = "c2";
+ int numMsgs = 5;
+
+ Consumer<byte[]> consumer1 = client.newConsumer().topic(topic1).subscriptionName(subName1).subscribe();
+ Consumer<byte[]> consumer2 = client.newConsumer().topic(topic1).subscriptionName(subName2).subscribe();
+ org.apache.pulsar.client.api.Producer<byte[]> producer = createProducer(client, topic1);
+ byte[] content = new byte[1024];
+ for (int i = 0; i < numMsgs; i++) {
+ producer.send(content);
+ consumer1.receive();
+ consumer2.receive();
+ }
+
+ TopicStats stats = getTopicStats(topic1);
+ assertEquals(stats.getSubscriptions().get(subName1).getMsgBacklog(), 5);
+ assertEquals(stats.getSubscriptions().get(subName2).getMsgBacklog(), 5);
+
+ // Sleep 5000 mills for first 5 messages.
+ Thread.sleep(5000L);
+ numMsgs = 9;
+ for (int i = 0; i < numMsgs; i++) {
+ producer.send(content);
+ consumer1.receive();
+ consumer2.receive();
+ }
+
+ // The first 5 messages are expired after sleeping 2000 more mills.
+ Thread.sleep(2000L);
+ rolloverStats();
+
+ TopicStats stats2 = getTopicStats(topic1);
+ // The first 5 messages should be expired due to limit time is 5 seconds, and the last 9 message should not.
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(stats2.getSubscriptions().get(subName1).getMsgBacklog(), 9);
+ assertEquals(stats2.getSubscriptions().get(subName2).getMsgBacklog(), 9);
+ });
+ client.close();
+ }
+
+
@Test
public void testConsumerBacklogEvictionTimeQuotaWithEmptyLedger() throws Exception {
assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"),