You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/10/07 03:42:54 UTC

[GitHub] [pulsar] HQebupt opened a new pull request, #17957: [fix][broker]unify time unit at dropping the backlog on a topic

HQebupt opened a new pull request, #17957:
URL: https://github.com/apache/pulsar/pull/17957

   ### Motivation
   The backlog quota in time does not work due to time unit inconsistency in dropping the backlog on a topic.
   The unit of backlog quota limit in time is seconds.
   https://github.com/apache/pulsar/blob/8d13ff81108b95ae5f63245e4a5edf4e097aaf47/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/BacklogQuota.java#L47-L52
   
   The time unit inconsistency is here as follows.
   https://github.com/apache/pulsar/blob/8d13ff81108b95ae5f63245e4a5edf4e097aaf47/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java#L221-L224
   
   ### Modifications
   unify time unit.
   
   ### Verifying this change
   
   - [x]  Make sure that the change passes the CI checks.
   
   This change is a trivial rework / code cleanup without any test coverage.
   ### Does this pull request potentially affect one of the following parts:
   
   If `yes` was chosen, please highlight the changes
   
   - Dependencies (does it add or upgrade a dependency): (no)
   - The public API: (no)
   - The schema: (no)
   - The default values of configurations: (no)
   - The wire protocol: (no)
   - The rest endpoints: (no)
   - The admin cli options: (no)
   - Anything that affects deployment: (no)
   
   ### Documentation
   
   Check the box below or label this PR directly.
   
   Need to update docs?
   
   - [ ] `doc-required`
   - [x] `doc-not-needed`
   - [ ] `doc` 
   - [ ] `doc-complete`
   
   ### Matching PR in forked repository
   
   PR in forked repository: https://github.com/HQebupt/pulsar/pull/3


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #17957: [fix][broker]unify time unit at dropping the backlog on a topic

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #17957:
URL: https://github.com/apache/pulsar/pull/17957#discussion_r990888430


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java:
##########
@@ -220,7 +220,7 @@ private void dropBacklogForTimeLimit(PersistentTopic persistentTopic, BacklogQuo
                     }
                     // Timestamp only > 0 if ledger has been closed
                     if (ledgerInfo.getTimestamp() > 0
-                            && currentMillis - ledgerInfo.getTimestamp() > quota.getLimitTime()) {
+                            && currentMillis - ledgerInfo.getTimestamp() > quota.getLimitTime() * 1000) {

Review Comment:
   It's a confusing name :)
   
   We should deprecate the old name `limitTime` and introduce a new one `limitTimeInSec`
   Not for this PR, we can start a new discussion in the dev mailing list and make it only happen on the next major release @HQebupt 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] HQebupt commented on pull request #17957: [fix][broker]unify time unit at dropping the backlog on a topic

Posted by GitBox <gi...@apache.org>.
HQebupt commented on PR #17957:
URL: https://github.com/apache/pulsar/pull/17957#issuecomment-1272277339

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui merged pull request #17957: [fix][broker]unify time unit at dropping the backlog on a topic

Posted by GitBox <gi...@apache.org>.
codelipenghui merged PR #17957:
URL: https://github.com/apache/pulsar/pull/17957


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] HQebupt commented on a diff in pull request #17957: [fix][broker]unify time unit at dropping the backlog on a topic

Posted by GitBox <gi...@apache.org>.
HQebupt commented on code in PR #17957:
URL: https://github.com/apache/pulsar/pull/17957#discussion_r989882080


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java:
##########
@@ -527,6 +527,60 @@ public void testConsumerBacklogEvictionTimeQuota() throws Exception {
         client.close();
     }
 
+    @Test(timeOut = 60000)
+    public void testConsumerBacklogEvictionTimeQuotaWithoutEviction() throws Exception {
+        assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"),
+                new HashMap<>());
+        admin.namespaces().setBacklogQuota("prop/ns-quota",
+                BacklogQuota.builder()
+                        .limitTime(5) // set limit time as 5 seconds
+                        .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction)
+                        .build(), BacklogQuota.BacklogQuotaType.message_age);
+        PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, TimeUnit.SECONDS)
+                .build();
+
+        final String topic1 = "persistent://prop/ns-quota/topic3" + UUID.randomUUID();
+        final String subName1 = "c1";
+        final String subName2 = "c2";
+        int numMsgs = 5;
+
+        Consumer<byte[]> consumer1 = client.newConsumer().topic(topic1).subscriptionName(subName1).subscribe();
+        Consumer<byte[]> consumer2 = client.newConsumer().topic(topic1).subscriptionName(subName2).subscribe();
+        org.apache.pulsar.client.api.Producer<byte[]> producer = createProducer(client, topic1);
+        byte[] content = new byte[1024];
+        for (int i = 0; i < numMsgs; i++) {
+            producer.send(content);
+            consumer1.receive();
+            consumer2.receive();
+        }
+
+        TopicStats stats = getTopicStats(topic1);
+        assertEquals(stats.getSubscriptions().get(subName1).getMsgBacklog(), 5);
+        assertEquals(stats.getSubscriptions().get(subName2).getMsgBacklog(), 5);
+
+        // Sleep 5000 mills for first 5 messages.
+        Thread.sleep(5000l);
+        numMsgs = 9;
+        for (int i = 0; i < numMsgs; i++) {
+            producer.send(content);
+            consumer1.receive();
+            consumer2.receive();

Review Comment:
   Just let consumers keep to consume all messages just like other test cases in BacklogQuotaManagerTest.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] HQebupt commented on pull request #17957: [fix][broker]unify time unit at dropping the backlog on a topic

Posted by GitBox <gi...@apache.org>.
HQebupt commented on PR #17957:
URL: https://github.com/apache/pulsar/pull/17957#issuecomment-1272243802

   /pulsarbot ready-to-test


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] HQebupt commented on pull request #17957: [fix][broker]unify time unit at dropping the backlog on a topic

Posted by GitBox <gi...@apache.org>.
HQebupt commented on PR #17957:
URL: https://github.com/apache/pulsar/pull/17957#issuecomment-1271077183

   @codelipenghui @Technoboy- @eolivelli @Jason918 @AnonHxy @gaozhangmin PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] HQebupt commented on a diff in pull request #17957: [fix][broker]unify time unit at dropping the backlog on a topic

Posted by GitBox <gi...@apache.org>.
HQebupt commented on code in PR #17957:
URL: https://github.com/apache/pulsar/pull/17957#discussion_r989875819


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java:
##########
@@ -527,6 +527,60 @@ public void testConsumerBacklogEvictionTimeQuota() throws Exception {
         client.close();
     }
 
+    @Test(timeOut = 60000)
+    public void testConsumerBacklogEvictionTimeQuotaWithoutEviction() throws Exception {
+        assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"),
+                new HashMap<>());
+        admin.namespaces().setBacklogQuota("prop/ns-quota",
+                BacklogQuota.builder()
+                        .limitTime(5) // set limit time as 5 seconds
+                        .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction)
+                        .build(), BacklogQuota.BacklogQuotaType.message_age);
+        PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, TimeUnit.SECONDS)
+                .build();
+
+        final String topic1 = "persistent://prop/ns-quota/topic3" + UUID.randomUUID();
+        final String subName1 = "c1";
+        final String subName2 = "c2";
+        int numMsgs = 5;
+
+        Consumer<byte[]> consumer1 = client.newConsumer().topic(topic1).subscriptionName(subName1).subscribe();
+        Consumer<byte[]> consumer2 = client.newConsumer().topic(topic1).subscriptionName(subName2).subscribe();
+        org.apache.pulsar.client.api.Producer<byte[]> producer = createProducer(client, topic1);
+        byte[] content = new byte[1024];
+        for (int i = 0; i < numMsgs; i++) {
+            producer.send(content);
+            consumer1.receive();
+            consumer2.receive();
+        }
+
+        TopicStats stats = getTopicStats(topic1);
+        assertEquals(stats.getSubscriptions().get(subName1).getMsgBacklog(), 5);
+        assertEquals(stats.getSubscriptions().get(subName2).getMsgBacklog(), 5);
+
+        // Sleep 5000 mills for first 5 messages.
+        Thread.sleep(5000l);

Review Comment:
   Good.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] HQebupt closed pull request #17957: [fix][broker]unify time unit at dropping the backlog on a topic

Posted by GitBox <gi...@apache.org>.
HQebupt closed pull request #17957: [fix][broker]unify time unit at dropping the backlog on a topic
URL: https://github.com/apache/pulsar/pull/17957


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] HQebupt commented on pull request #17957: [fix][broker]unify time unit at dropping the backlog on a topic

Posted by GitBox <gi...@apache.org>.
HQebupt commented on PR #17957:
URL: https://github.com/apache/pulsar/pull/17957#issuecomment-1272517756

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] AnonHxy commented on a diff in pull request #17957: [fix][broker]unify time unit at dropping the backlog on a topic

Posted by GitBox <gi...@apache.org>.
AnonHxy commented on code in PR #17957:
URL: https://github.com/apache/pulsar/pull/17957#discussion_r989798066


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java:
##########
@@ -111,7 +111,7 @@ void setup() throws Exception {
             config.setBrokerServicePort(Optional.of(0));
             config.setAuthorizationEnabled(false);
             config.setAuthenticationEnabled(false);
-            config.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
+            config.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA / 2);

Review Comment:
   It looks that this is a common config,  and this change  maybe break other test case



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java:
##########
@@ -527,6 +527,60 @@ public void testConsumerBacklogEvictionTimeQuota() throws Exception {
         client.close();
     }
 
+    @Test(timeOut = 60000)
+    public void testConsumerBacklogEvictionTimeQuotaWithoutEviction() throws Exception {
+        assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"),
+                new HashMap<>());
+        admin.namespaces().setBacklogQuota("prop/ns-quota",
+                BacklogQuota.builder()
+                        .limitTime(5) // set limit time as 5 seconds
+                        .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction)
+                        .build(), BacklogQuota.BacklogQuotaType.message_age);
+        PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, TimeUnit.SECONDS)
+                .build();
+
+        final String topic1 = "persistent://prop/ns-quota/topic3" + UUID.randomUUID();
+        final String subName1 = "c1";
+        final String subName2 = "c2";
+        int numMsgs = 5;
+
+        Consumer<byte[]> consumer1 = client.newConsumer().topic(topic1).subscriptionName(subName1).subscribe();
+        Consumer<byte[]> consumer2 = client.newConsumer().topic(topic1).subscriptionName(subName2).subscribe();
+        org.apache.pulsar.client.api.Producer<byte[]> producer = createProducer(client, topic1);
+        byte[] content = new byte[1024];
+        for (int i = 0; i < numMsgs; i++) {
+            producer.send(content);
+            consumer1.receive();
+            consumer2.receive();
+        }
+
+        TopicStats stats = getTopicStats(topic1);
+        assertEquals(stats.getSubscriptions().get(subName1).getMsgBacklog(), 5);
+        assertEquals(stats.getSubscriptions().get(subName2).getMsgBacklog(), 5);
+
+        // Sleep 5000 mills for first 5 messages.
+        Thread.sleep(5000l);
+        numMsgs = 9;
+        for (int i = 0; i < numMsgs; i++) {
+            producer.send(content);
+            consumer1.receive();
+            consumer2.receive();

Review Comment:
   It seems that there is no need for the two line above



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java:
##########
@@ -527,6 +527,60 @@ public void testConsumerBacklogEvictionTimeQuota() throws Exception {
         client.close();
     }
 
+    @Test(timeOut = 60000)
+    public void testConsumerBacklogEvictionTimeQuotaWithoutEviction() throws Exception {
+        assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"),
+                new HashMap<>());
+        admin.namespaces().setBacklogQuota("prop/ns-quota",
+                BacklogQuota.builder()
+                        .limitTime(5) // set limit time as 5 seconds
+                        .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction)
+                        .build(), BacklogQuota.BacklogQuotaType.message_age);
+        PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, TimeUnit.SECONDS)
+                .build();
+
+        final String topic1 = "persistent://prop/ns-quota/topic3" + UUID.randomUUID();
+        final String subName1 = "c1";
+        final String subName2 = "c2";
+        int numMsgs = 5;
+
+        Consumer<byte[]> consumer1 = client.newConsumer().topic(topic1).subscriptionName(subName1).subscribe();
+        Consumer<byte[]> consumer2 = client.newConsumer().topic(topic1).subscriptionName(subName2).subscribe();
+        org.apache.pulsar.client.api.Producer<byte[]> producer = createProducer(client, topic1);
+        byte[] content = new byte[1024];
+        for (int i = 0; i < numMsgs; i++) {
+            producer.send(content);
+            consumer1.receive();
+            consumer2.receive();
+        }
+
+        TopicStats stats = getTopicStats(topic1);
+        assertEquals(stats.getSubscriptions().get(subName1).getMsgBacklog(), 5);
+        assertEquals(stats.getSubscriptions().get(subName2).getMsgBacklog(), 5);
+
+        // Sleep 5000 mills for first 5 messages.
+        Thread.sleep(5000l);

Review Comment:
   We'd better use '5000L'  instead of '5000l'



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] HQebupt commented on a diff in pull request #17957: [fix][broker]unify time unit at dropping the backlog on a topic

Posted by GitBox <gi...@apache.org>.
HQebupt commented on code in PR #17957:
URL: https://github.com/apache/pulsar/pull/17957#discussion_r989876090


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java:
##########
@@ -111,7 +111,7 @@ void setup() throws Exception {
             config.setBrokerServicePort(Optional.of(0));
             config.setAuthorizationEnabled(false);
             config.setAuthenticationEnabled(false);
-            config.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
+            config.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA / 2);

Review Comment:
   It is fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] HQebupt commented on pull request #17957: [fix][broker]unify time unit at dropping the backlog on a topic

Posted by GitBox <gi...@apache.org>.
HQebupt commented on PR #17957:
URL: https://github.com/apache/pulsar/pull/17957#issuecomment-1272497896

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] HQebupt commented on pull request #17957: [fix][broker]unify time unit at dropping the backlog on a topic

Posted by GitBox <gi...@apache.org>.
HQebupt commented on PR #17957:
URL: https://github.com/apache/pulsar/pull/17957#issuecomment-1272537341

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #17957: [fix][broker]unify time unit at dropping the backlog on a topic

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #17957:
URL: https://github.com/apache/pulsar/pull/17957#discussion_r990888430


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java:
##########
@@ -220,7 +220,7 @@ private void dropBacklogForTimeLimit(PersistentTopic persistentTopic, BacklogQuo
                     }
                     // Timestamp only > 0 if ledger has been closed
                     if (ledgerInfo.getTimestamp() > 0
-                            && currentMillis - ledgerInfo.getTimestamp() > quota.getLimitTime()) {
+                            && currentMillis - ledgerInfo.getTimestamp() > quota.getLimitTime() * 1000) {

Review Comment:
   It's a confusing name :)
   
   We should deprecate the old name `limitTime` and introduce a new one `limitTimeInSec`
   Not for this PR, we can start a new discussion in the dev mailing list and make it only happen on the next major release
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] HQebupt commented on pull request #17957: [fix][broker]unify time unit at dropping the backlog on a topic

Posted by GitBox <gi...@apache.org>.
HQebupt commented on PR #17957:
URL: https://github.com/apache/pulsar/pull/17957#issuecomment-1272198194

   @AnonHxy PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] HQebupt commented on pull request #17957: [fix][broker]unify time unit at dropping the backlog on a topic

Posted by GitBox <gi...@apache.org>.
HQebupt commented on PR #17957:
URL: https://github.com/apache/pulsar/pull/17957#issuecomment-1272432609

   /pulsarbot ready-to-test


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org