You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/04/29 03:38:21 UTC

[pulsar] branch master updated: [Test] Use precise backlog when getting stats in the test. (#14489)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d8b99485834 [Test] Use precise backlog when getting stats in the test. (#14489)
d8b99485834 is described below

commit d8b994858343d5161e3396a0cab359767626cbb6
Author: Zike Yang <zk...@streamnative.io>
AuthorDate: Fri Apr 29 11:38:16 2022 +0800

    [Test] Use precise backlog when getting stats in the test. (#14489)
    
    * [Test] Use precise backlog when getting stats in the test.
    
    Motivation
    In the `BacklogQuotaManagerTest`, the message backlog in stats obtained from the admin may not be precise. This causes them to become flaky tests. We need to get the precise backlog in the test.
    
    Modification
    * Use precise backlog when getting stats in the test.
    
    Signed-off-by: Zike Yang <zk...@streamnative.io>
    
    * Extract method
    
    Co-authored-by: Lari Hotari <lh...@apache.org>
---
 .../broker/service/BacklogQuotaManagerTest.java    | 66 ++++++++++++----------
 1 file changed, 36 insertions(+), 30 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
index f36d0d5dd1d..af431e80b30 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
@@ -39,7 +39,9 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.admin.GetStatsOptions;
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
@@ -180,7 +182,7 @@ public class BacklogQuotaManagerTest {
 
             rolloverStats();
 
-            TopicStats stats = admin.topics().getStats(topic1);
+            TopicStats stats = getTopicStats(topic1);
 
             // overall backlogSize should be zero because we only have readers
             assertEquals(stats.getBacklogSize(), 0, "backlog size is [" + stats.getBacklogSize() + "]");
@@ -229,6 +231,12 @@ public class BacklogQuotaManagerTest {
         }
     }
 
+    private TopicStats getTopicStats(String topic1) throws PulsarAdminException {
+        TopicStats stats =
+                admin.topics().getStats(topic1, GetStatsOptions.builder().getPreciseBacklog(true).build());
+        return stats;
+    }
+
     @Test
     public void testTriggerBacklogQuotaSizeWithReader() throws Exception {
         assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"),
@@ -252,7 +260,7 @@ public class BacklogQuotaManagerTest {
             Thread.sleep(TIME_TO_CHECK_BACKLOG_QUOTA * 1000);
             admin.brokers().backlogQuotaCheck();
             rolloverStats();
-            TopicStats stats = admin.topics().getStats(topic1);
+            TopicStats stats = getTopicStats(topic1);
             // overall backlogSize should be zero because we only have readers
             assertEquals(stats.getBacklogSize(), 0, "backlog size is [" + stats.getBacklogSize() + "]");
             // non-durable mes should still
@@ -322,7 +330,7 @@ public class BacklogQuotaManagerTest {
             Thread.sleep(TIME_TO_CHECK_BACKLOG_QUOTA * 1000);
             admin.brokers().backlogQuotaCheck();
             rolloverStats();
-            TopicStats stats = admin.topics().getStats(topic1);
+            TopicStats stats = getTopicStats(topic1);
             // overall backlogSize should be zero because we only have readers
             assertEquals(stats.getBacklogSize(), 0, "backlog size is [" + stats.getBacklogSize() + "]");
             // non-durable mes should still
@@ -397,7 +405,7 @@ public class BacklogQuotaManagerTest {
         Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000);
         rolloverStats();
 
-        TopicStats stats = admin.topics().getStats(topic1);
+        TopicStats stats = getTopicStats(topic1);
         assertTrue(stats.getBacklogSize() < 10 * 1024, "Storage size is [" + stats.getStorageSize() + "]");
     }
 
@@ -429,14 +437,14 @@ public class BacklogQuotaManagerTest {
             consumer2.receive();
         }
 
-        TopicStats stats = admin.topics().getStats(topic1);
+        TopicStats stats = getTopicStats(topic1);
         assertEquals(stats.getSubscriptions().get(subName1).getMsgBacklog(), 9);
         assertEquals(stats.getSubscriptions().get(subName2).getMsgBacklog(), 9);
 
         Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA * 2) * 1000);
         rolloverStats();
 
-        stats = admin.topics().getStats(topic1);
+        stats = getTopicStats(topic1);
         // All messages for both subscription should be cleaned up from backlog by backlog monitor task.
         assertEquals(stats.getSubscriptions().get(subName1).getMsgBacklog(), 0);
         assertEquals(stats.getSubscriptions().get(subName2).getMsgBacklog(), 0);
@@ -470,14 +478,14 @@ public class BacklogQuotaManagerTest {
             consumer2.receive();
         }
 
-        TopicStats stats = admin.topics().getStats(topic1);
+        TopicStats stats = getTopicStats(topic1);
         assertEquals(stats.getSubscriptions().get(subName1).getMsgBacklog(), 14);
         assertEquals(stats.getSubscriptions().get(subName2).getMsgBacklog(), 14);
 
         Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA * 2) * 1000);
         rolloverStats();
 
-        stats = admin.topics().getStats(topic1);
+        stats = getTopicStats(topic1);
         PersistentTopic topic1Reference = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic1).get();
         ManagedLedgerImpl ml = (ManagedLedgerImpl) topic1Reference.getManagedLedger();
         // Messages on first 2 ledgers should be expired, backlog is number of
@@ -513,7 +521,7 @@ public class BacklogQuotaManagerTest {
         assertEquals(internalStats.ledgers.size(), 2);
         assertEquals(internalStats.ledgers.get(1).entries, 0);
 
-        TopicStats stats = admin.topics().getStats(topic);
+        TopicStats stats = getTopicStats(topic);
         assertEquals(stats.getSubscriptions().get(subName).getMsgBacklog(), 1);
 
         TimeUnit.SECONDS.sleep(TIME_TO_CHECK_BACKLOG_QUOTA);
@@ -529,7 +537,7 @@ public class BacklogQuotaManagerTest {
                     PersistentTopicInternalStats latestInternalStats = admin.topics().getInternalStats(topic);
                     assertEquals(latestInternalStats.ledgers.size(), 2);
                     assertEquals(latestInternalStats.ledgers.get(1).entries, 0);
-                    TopicStats latestStats = admin.topics().getStats(topic);
+                    TopicStats latestStats = getTopicStats(topic);
                     assertEquals(latestStats.getSubscriptions().get(subName).getMsgBacklog(), 0);
                 });
 
@@ -567,7 +575,7 @@ public class BacklogQuotaManagerTest {
         Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000);
         rolloverStats();
 
-        TopicStats stats = admin.topics().getStats(topic1);
+        TopicStats stats = getTopicStats(topic1);
         assertTrue(stats.getBacklogSize() <= 10 * 1024, "Storage size is [" + stats.getStorageSize() + "]");
     }
 
@@ -599,7 +607,7 @@ public class BacklogQuotaManagerTest {
             consumer2.receive();
         }
 
-        TopicStats stats = admin.topics().getStats(topic1);
+        TopicStats stats = getTopicStats(topic1);
         assertEquals(stats.getSubscriptions().get(subName1).getMsgBacklog(), 9);
         assertEquals(stats.getSubscriptions().get(subName2).getMsgBacklog(), 9);
 
@@ -611,7 +619,7 @@ public class BacklogQuotaManagerTest {
 
         Thread.sleep(1000);
         rolloverStats();
-        stats = admin.topics().getStats(topic1);
+        stats = getTopicStats(topic1);
         // sub1 has empty backlog as it acked all messages
         assertEquals(stats.getSubscriptions().get(subName1).getMsgBacklog(), 0);
         assertEquals(stats.getSubscriptions().get(subName2).getMsgBacklog(), 9);
@@ -619,7 +627,7 @@ public class BacklogQuotaManagerTest {
         Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA * 2) * 1000);
         rolloverStats();
 
-        stats = admin.topics().getStats(topic1);
+        stats = getTopicStats(topic1);
         // sub2 has empty backlog because it's backlog get cleaned up by backlog quota monitor task
         assertEquals(stats.getSubscriptions().get(subName2).getMsgBacklog(), 0);
         client.close();
@@ -665,7 +673,7 @@ public class BacklogQuotaManagerTest {
         }
 
         {
-            TopicStats stats = admin.topics().getStats(topic1);
+            TopicStats stats = getTopicStats(topic1);
             assertEquals(stats.getSubscriptions().get(subName1).getMsgBacklog(), 14);
             assertEquals(stats.getSubscriptions().get(subName2).getMsgBacklog(), 14);
         }
@@ -684,7 +692,7 @@ public class BacklogQuotaManagerTest {
                 .pollInterval(Duration.ofSeconds(1))
                 .untilAsserted(() -> {
                     rolloverStats();
-                    TopicStats stats = admin.topics().getStats(topic1);
+                    TopicStats stats = getTopicStats(topic1);
                     // sub1 has empty backlog as it acked all messages
                     assertEquals(stats.getSubscriptions().get(subName1).getMsgBacklog(), 0);
                     assertEquals(stats.getSubscriptions().get(subName2).getMsgBacklog(), 14);
@@ -696,11 +704,9 @@ public class BacklogQuotaManagerTest {
                 .untilAsserted(() -> {
                     // Messages on first 2 ledgers should be expired, backlog is number of
                     // message in current ledger which should be 4.
-                    long msgBacklog = admin.topics().getStats(topic1).getSubscriptions().get(subName2).getMsgBacklog();
-                    // TODO: for some reason the backlog size is sometimes off by one
-                    // Internally there's a method `long getNumberOfEntriesInBacklog(boolean getPreciseBacklog)`
-                    // on org.apache.pulsar.broker.service.Subscription interface
-                    // the `boolean getPreciseBacklog` parameter indicates that the backlog size isn't accurate
+                    long msgBacklog =
+                            getTopicStats(topic1)
+                                    .getSubscriptions().get(subName2).getMsgBacklog();
                     assertEquals(msgBacklog, 4, 1);
                 });
     }
@@ -772,7 +778,7 @@ public class BacklogQuotaManagerTest {
         Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000);
         rolloverStats();
 
-        TopicStats stats = admin.topics().getStats(topic1);
+        TopicStats stats = getTopicStats(topic1);
         assertTrue(stats.getBacklogSize() <= 10 * 1024, "Storage size is [" + stats.getStorageSize() + "]");
     }
 
@@ -938,7 +944,7 @@ public class BacklogQuotaManagerTest {
         Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000);
         rolloverStats();
 
-        TopicStats stats = admin.topics().getStats(topic1);
+        TopicStats stats = getTopicStats(topic1);
         assertTrue(stats.getBacklogSize() <= 15 * 1024, "Storage size is [" + stats.getStorageSize() + "]");
     }
 
@@ -979,7 +985,7 @@ public class BacklogQuotaManagerTest {
 
         Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000);
         rolloverStats();
-        TopicStats stats = admin.topics().getStats(topic1);
+        TopicStats stats = getTopicStats(topic1);
         assertEquals(stats.getPublishers().size(), 0,
                 "Number of producers on topic " + topic1 + " are [" + stats.getPublishers().size() + "]");
     }
@@ -1102,7 +1108,7 @@ public class BacklogQuotaManagerTest {
         assertTrue(gotException, "backlog exceeded exception did not occur");
         // now remove backlog and ensure that producer is unblocked;
 
-        TopicStats stats = admin.topics().getStats(topic1);
+        TopicStats stats = getTopicStats(topic1);
         int backlog = (int) stats.getSubscriptions().get(subName1).getMsgBacklog();
 
         for (int i = 0; i < backlog; i++) {
@@ -1166,7 +1172,7 @@ public class BacklogQuotaManagerTest {
         assertTrue(gotException, "backlog exceeded exception did not occur");
 
         // now remove backlog and ensure that producer is unblocked;
-        TopicStats stats = admin.topics().getStats(topic1);
+        TopicStats stats = getTopicStats(topic1);
         assertEquals(stats.getSubscriptions().get(subName1).getMsgBacklog(), numMsgs);
 
         for (int i = 0; i < numMsgs; i++) {
@@ -1175,7 +1181,7 @@ public class BacklogQuotaManagerTest {
 
         Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA * 2) * 1000);
         rolloverStats();
-        stats = admin.topics().getStats(topic1);
+        stats = getTopicStats(topic1);
         assertEquals(stats.getSubscriptions().get(subName1).getMsgBacklog(), 0);
         // publish should work now
         Exception sendException = null;
@@ -1231,7 +1237,7 @@ public class BacklogQuotaManagerTest {
         assertTrue(gotException, "backlog exceeded exception did not occur");
 
         // now remove backlog and ensure that producer is unblocked;
-        TopicStats stats = admin.topics().getStats(topic1);
+        TopicStats stats = getTopicStats(topic1);
         assertEquals(stats.getSubscriptions().get(subName1).getMsgBacklog(), numMsgs);
 
         for (int i = 0; i < numMsgs; i++) {
@@ -1240,7 +1246,7 @@ public class BacklogQuotaManagerTest {
 
         Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA * 2) * 1000);
         rolloverStats();
-        stats = admin.topics().getStats(topic1);
+        stats = getTopicStats(topic1);
         assertEquals(stats.getSubscriptions().get(subName1).getMsgBacklog(), 0);
         // publish should work now
         Exception sendException = null;
@@ -1293,7 +1299,7 @@ public class BacklogQuotaManagerTest {
         Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000);
         rolloverStats();
 
-        TopicStats stats = admin.topics().getStats(topic1);
+        TopicStats stats = getTopicStats(topic1);
         assertTrue(stats.getBacklogSize() < 10 * 1024, "Storage size is [" + stats.getStorageSize() + "]");
     }
     private static final Logger LOG = LoggerFactory.getLogger(BacklogQuotaManagerTest.class);