You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/09/28 14:22:33 UTC

[pulsar] 01/02: [fix][test] Fix flaky test ResourceGroupUsageAggregationTest. testProduceConsumeUsageOnRG (#17617)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 39f14bd1b5bdf4c0c0e2916d07234f6c0f51ee36
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Fri Sep 16 15:51:40 2022 +0800

    [fix][test] Fix flaky test ResourceGroupUsageAggregationTest. testProduceConsumeUsageOnRG (#17617)
---
 .../broker/resourcegroup/ResourceGroupService.java |  11 +++
 .../ResourceGroupUsageAggregationTest.java         | 105 ++++++++++++---------
 2 files changed, 70 insertions(+), 46 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java
index 79cba28d374..4bb1bc8ab24 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.resourcegroup;
 
 import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
+import com.google.common.annotations.VisibleForTesting;
 import io.prometheus.client.Counter;
 import io.prometheus.client.Summary;
 import java.util.Map;
@@ -829,4 +830,14 @@ public class ResourceGroupService {
             .name("pulsar_resource_group_calculate_quota_secs")
             .help("Time required to calculate quota of all resource groups, in seconds.")
             .register();
+
+    @VisibleForTesting
+    ConcurrentHashMap getTopicConsumeStats() {
+        return this.topicConsumeStats;
+    }
+
+    @VisibleForTesting
+    ConcurrentHashMap getTopicProduceStats() {
+        return this.topicProduceStats;
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupUsageAggregationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupUsageAggregationTest.java
index a86035b71e0..1fd9271d9bd 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupUsageAggregationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupUsageAggregationTest.java
@@ -24,6 +24,8 @@ import org.apache.pulsar.broker.resourcegroup.ResourceGroup.BytesAndMessagesCoun
 import org.apache.pulsar.broker.resourcegroup.ResourceGroup.ResourceGroupMonitoringClass;
 import org.apache.pulsar.broker.resourcegroup.ResourceGroupService.ResourceGroupUsageStatsType;
 import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.service.resource.usage.ResourceUsage;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
@@ -37,16 +39,16 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
+import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
-
-import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
 @Slf4j
-@Test(groups = "flaky")
 public class ResourceGroupUsageAggregationTest extends ProducerConsumerBase {
     @BeforeClass
     @Override
@@ -120,10 +122,11 @@ public class ResourceGroupUsageAggregationTest extends ProducerConsumerBase {
                 .create();
 
         Consumer<byte[]> consumer = null;
+        String subscriptionName = "my-subscription";
         try {
             consumer = pulsarClient.newConsumer()
                     .topic(topicString)
-                    .subscriptionName("my-subscription")
+                    .subscriptionName(subscriptionName)
                     .subscriptionType(SubscriptionType.Shared)
                     .subscribe();
         } catch (PulsarClientException p) {
@@ -176,6 +179,20 @@ public class ResourceGroupUsageAggregationTest extends ProducerConsumerBase {
                 true, true);
 
         consumer.close();
+        // cleanup the topic data.
+        CompletableFuture<Optional<Topic>> topicFuture = pulsar.getBrokerService().getTopics().remove(topicString);
+        if (topicFuture != null) {
+            Optional<Topic> optTopic = topicFuture.join();
+            if (optTopic.isPresent()) {
+                Topic topic = optTopic.get();
+                if (topic instanceof PersistentTopic) {
+                    PersistentTopic persistentTopic = (PersistentTopic) topic;
+                    persistentTopic.getSubscription(subscriptionName).deleteForcefully();
+                }
+            }
+        }
+        rgs.getTopicConsumeStats().clear();
+        rgs.getTopicProduceStats().clear();
 
         rgs.unRegisterTenant(activeRgName, tenantString);
         rgs.unRegisterNameSpace(activeRgName, NamespaceName.get(nsString));
@@ -192,48 +209,44 @@ public class ResourceGroupUsageAggregationTest extends ProducerConsumerBase {
                              boolean checkProduce, boolean checkConsume)
                                                                 throws InterruptedException, PulsarAdminException {
         BrokerService bs = pulsar.getBrokerService();
-        Map<String, TopicStatsImpl> topicStatsMap = bs.getTopicStats();
-        for (Map.Entry<String, TopicStatsImpl> entry : topicStatsMap.entrySet()) {
-            String mapTopicName = entry.getKey();
-            if (mapTopicName.equals(topicString)) {
-                TopicStatsImpl stats = entry.getValue();
-                if (checkProduce) {
-                    Assert.assertTrue(stats.bytesInCounter >= sentNumBytes);
-                    Assert.assertEquals(sentNumMsgs, stats.msgInCounter);
-                }
-                if (checkConsume) {
-                    Assert.assertTrue(stats.bytesOutCounter >= recvdNumBytes);
-                    Assert.assertEquals(recvdNumMsgs, stats.msgOutCounter);
-                }
-
-                if (sentNumMsgs > 0 || recvdNumMsgs > 0) {
-                    rgs.aggregateResourceGroupLocalUsages();  // hack to ensure aggregator calculation without waiting
-                    BytesAndMessagesCount prodCounts = rgs.getRGUsage(rgName, ResourceGroupMonitoringClass.Publish,
-                            ResourceGroupUsageStatsType.Cumulative);
-                    BytesAndMessagesCount consCounts = rgs.getRGUsage(rgName, ResourceGroupMonitoringClass.Dispatch,
-                            ResourceGroupUsageStatsType.Cumulative);
-
-                    // Re-do the getRGUsage.
-                    // The counts should be equal, since there wasn't any intervening traffic on TEST_PRODUCE_CONSUME_TOPIC.
-                    BytesAndMessagesCount prodCounts1 = rgs.getRGUsage(rgName, ResourceGroupMonitoringClass.Publish,
-                            ResourceGroupUsageStatsType.Cumulative);
-                    BytesAndMessagesCount consCounts1 = rgs.getRGUsage(rgName, ResourceGroupMonitoringClass.Dispatch,
-                            ResourceGroupUsageStatsType.Cumulative);
-
-                    Assert.assertEquals(prodCounts1.bytes, prodCounts.bytes);
-                    Assert.assertEquals(prodCounts1.messages, prodCounts.messages);
-                    Assert.assertEquals(consCounts1.bytes, consCounts.bytes);
-                    Assert.assertEquals(consCounts1.messages, consCounts.messages);
-
-                    if (checkProduce) {
-                        Assert.assertTrue(prodCounts.bytes >= sentNumBytes);
-                        Assert.assertEquals(sentNumMsgs, prodCounts.messages);
-                    }
-                    if (checkConsume) {
-                        Assert.assertTrue(consCounts.bytes >= recvdNumBytes);
-                        Assert.assertEquals(recvdNumMsgs, consCounts.messages);
-                    }
-                }
+        Awaitility.await().untilAsserted(() -> {
+            TopicStatsImpl topicStats = bs.getTopicStats().get(topicString);
+            Assert.assertNotNull(topicStats);
+            if (checkProduce) {
+                Assert.assertTrue(topicStats.bytesInCounter >= sentNumBytes);
+                Assert.assertEquals(sentNumMsgs, topicStats.msgInCounter);
+            }
+            if (checkConsume) {
+                Assert.assertTrue(topicStats.bytesOutCounter >= recvdNumBytes);
+                Assert.assertEquals(recvdNumMsgs, topicStats.msgOutCounter);
+            }
+        });
+        if (sentNumMsgs > 0 || recvdNumMsgs > 0) {
+            rgs.aggregateResourceGroupLocalUsages();  // hack to ensure aggregator calculation without waiting
+            BytesAndMessagesCount prodCounts = rgs.getRGUsage(rgName, ResourceGroupMonitoringClass.Publish,
+                    ResourceGroupUsageStatsType.Cumulative);
+            BytesAndMessagesCount consCounts = rgs.getRGUsage(rgName, ResourceGroupMonitoringClass.Dispatch,
+                    ResourceGroupUsageStatsType.Cumulative);
+
+            // Re-do the getRGUsage.
+            // The counts should be equal, since there wasn't any intervening traffic on TEST_PRODUCE_CONSUME_TOPIC.
+            BytesAndMessagesCount prodCounts1 = rgs.getRGUsage(rgName, ResourceGroupMonitoringClass.Publish,
+                    ResourceGroupUsageStatsType.Cumulative);
+            BytesAndMessagesCount consCounts1 = rgs.getRGUsage(rgName, ResourceGroupMonitoringClass.Dispatch,
+                    ResourceGroupUsageStatsType.Cumulative);
+
+            Assert.assertEquals(prodCounts1.bytes, prodCounts.bytes);
+            Assert.assertEquals(prodCounts1.messages, prodCounts.messages);
+            Assert.assertEquals(consCounts1.bytes, consCounts.bytes);
+            Assert.assertEquals(consCounts1.messages, consCounts.messages);
+
+            if (checkProduce) {
+                Assert.assertTrue(prodCounts.bytes >= sentNumBytes);
+                Assert.assertEquals(sentNumMsgs, prodCounts.messages);
+            }
+            if (checkConsume) {
+                Assert.assertTrue(consCounts.bytes >= recvdNumBytes);
+                Assert.assertEquals(recvdNumMsgs, consCounts.messages);
             }
         }
     }