You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/09/28 14:22:33 UTC
[pulsar] 01/02: [fix][test] Fix flaky test ResourceGroupUsageAggregationTest. testProduceConsumeUsageOnRG (#17617)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 39f14bd1b5bdf4c0c0e2916d07234f6c0f51ee36
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Fri Sep 16 15:51:40 2022 +0800
[fix][test] Fix flaky test ResourceGroupUsageAggregationTest. testProduceConsumeUsageOnRG (#17617)
---
.../broker/resourcegroup/ResourceGroupService.java | 11 +++
.../ResourceGroupUsageAggregationTest.java | 105 ++++++++++++---------
2 files changed, 70 insertions(+), 46 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java
index 79cba28d374..4bb1bc8ab24 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.broker.resourcegroup;
import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
+import com.google.common.annotations.VisibleForTesting;
import io.prometheus.client.Counter;
import io.prometheus.client.Summary;
import java.util.Map;
@@ -829,4 +830,14 @@ public class ResourceGroupService {
.name("pulsar_resource_group_calculate_quota_secs")
.help("Time required to calculate quota of all resource groups, in seconds.")
.register();
+
+ @VisibleForTesting
+ ConcurrentHashMap getTopicConsumeStats() {
+ return this.topicConsumeStats;
+ }
+
+ @VisibleForTesting
+ ConcurrentHashMap getTopicProduceStats() {
+ return this.topicProduceStats;
+ }
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupUsageAggregationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupUsageAggregationTest.java
index a86035b71e0..1fd9271d9bd 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupUsageAggregationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupUsageAggregationTest.java
@@ -24,6 +24,8 @@ import org.apache.pulsar.broker.resourcegroup.ResourceGroup.BytesAndMessagesCoun
import org.apache.pulsar.broker.resourcegroup.ResourceGroup.ResourceGroupMonitoringClass;
import org.apache.pulsar.broker.resourcegroup.ResourceGroupService.ResourceGroupUsageStatsType;
import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.resource.usage.ResourceUsage;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
@@ -37,16 +39,16 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
+import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-
-import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@Slf4j
-@Test(groups = "flaky")
public class ResourceGroupUsageAggregationTest extends ProducerConsumerBase {
@BeforeClass
@Override
@@ -120,10 +122,11 @@ public class ResourceGroupUsageAggregationTest extends ProducerConsumerBase {
.create();
Consumer<byte[]> consumer = null;
+ String subscriptionName = "my-subscription";
try {
consumer = pulsarClient.newConsumer()
.topic(topicString)
- .subscriptionName("my-subscription")
+ .subscriptionName(subscriptionName)
.subscriptionType(SubscriptionType.Shared)
.subscribe();
} catch (PulsarClientException p) {
@@ -176,6 +179,20 @@ public class ResourceGroupUsageAggregationTest extends ProducerConsumerBase {
true, true);
consumer.close();
+ // cleanup the topic data.
+ CompletableFuture<Optional<Topic>> topicFuture = pulsar.getBrokerService().getTopics().remove(topicString);
+ if (topicFuture != null) {
+ Optional<Topic> optTopic = topicFuture.join();
+ if (optTopic.isPresent()) {
+ Topic topic = optTopic.get();
+ if (topic instanceof PersistentTopic) {
+ PersistentTopic persistentTopic = (PersistentTopic) topic;
+ persistentTopic.getSubscription(subscriptionName).deleteForcefully();
+ }
+ }
+ }
+ rgs.getTopicConsumeStats().clear();
+ rgs.getTopicProduceStats().clear();
rgs.unRegisterTenant(activeRgName, tenantString);
rgs.unRegisterNameSpace(activeRgName, NamespaceName.get(nsString));
@@ -192,48 +209,44 @@ public class ResourceGroupUsageAggregationTest extends ProducerConsumerBase {
boolean checkProduce, boolean checkConsume)
throws InterruptedException, PulsarAdminException {
BrokerService bs = pulsar.getBrokerService();
- Map<String, TopicStatsImpl> topicStatsMap = bs.getTopicStats();
- for (Map.Entry<String, TopicStatsImpl> entry : topicStatsMap.entrySet()) {
- String mapTopicName = entry.getKey();
- if (mapTopicName.equals(topicString)) {
- TopicStatsImpl stats = entry.getValue();
- if (checkProduce) {
- Assert.assertTrue(stats.bytesInCounter >= sentNumBytes);
- Assert.assertEquals(sentNumMsgs, stats.msgInCounter);
- }
- if (checkConsume) {
- Assert.assertTrue(stats.bytesOutCounter >= recvdNumBytes);
- Assert.assertEquals(recvdNumMsgs, stats.msgOutCounter);
- }
-
- if (sentNumMsgs > 0 || recvdNumMsgs > 0) {
- rgs.aggregateResourceGroupLocalUsages(); // hack to ensure aggregator calculation without waiting
- BytesAndMessagesCount prodCounts = rgs.getRGUsage(rgName, ResourceGroupMonitoringClass.Publish,
- ResourceGroupUsageStatsType.Cumulative);
- BytesAndMessagesCount consCounts = rgs.getRGUsage(rgName, ResourceGroupMonitoringClass.Dispatch,
- ResourceGroupUsageStatsType.Cumulative);
-
- // Re-do the getRGUsage.
- // The counts should be equal, since there wasn't any intervening traffic on TEST_PRODUCE_CONSUME_TOPIC.
- BytesAndMessagesCount prodCounts1 = rgs.getRGUsage(rgName, ResourceGroupMonitoringClass.Publish,
- ResourceGroupUsageStatsType.Cumulative);
- BytesAndMessagesCount consCounts1 = rgs.getRGUsage(rgName, ResourceGroupMonitoringClass.Dispatch,
- ResourceGroupUsageStatsType.Cumulative);
-
- Assert.assertEquals(prodCounts1.bytes, prodCounts.bytes);
- Assert.assertEquals(prodCounts1.messages, prodCounts.messages);
- Assert.assertEquals(consCounts1.bytes, consCounts.bytes);
- Assert.assertEquals(consCounts1.messages, consCounts.messages);
-
- if (checkProduce) {
- Assert.assertTrue(prodCounts.bytes >= sentNumBytes);
- Assert.assertEquals(sentNumMsgs, prodCounts.messages);
- }
- if (checkConsume) {
- Assert.assertTrue(consCounts.bytes >= recvdNumBytes);
- Assert.assertEquals(recvdNumMsgs, consCounts.messages);
- }
- }
+ Awaitility.await().untilAsserted(() -> {
+ TopicStatsImpl topicStats = bs.getTopicStats().get(topicString);
+ Assert.assertNotNull(topicStats);
+ if (checkProduce) {
+ Assert.assertTrue(topicStats.bytesInCounter >= sentNumBytes);
+ Assert.assertEquals(sentNumMsgs, topicStats.msgInCounter);
+ }
+ if (checkConsume) {
+ Assert.assertTrue(topicStats.bytesOutCounter >= recvdNumBytes);
+ Assert.assertEquals(recvdNumMsgs, topicStats.msgOutCounter);
+ }
+ });
+ if (sentNumMsgs > 0 || recvdNumMsgs > 0) {
+ rgs.aggregateResourceGroupLocalUsages(); // hack to ensure aggregator calculation without waiting
+ BytesAndMessagesCount prodCounts = rgs.getRGUsage(rgName, ResourceGroupMonitoringClass.Publish,
+ ResourceGroupUsageStatsType.Cumulative);
+ BytesAndMessagesCount consCounts = rgs.getRGUsage(rgName, ResourceGroupMonitoringClass.Dispatch,
+ ResourceGroupUsageStatsType.Cumulative);
+
+ // Re-do the getRGUsage.
+ // The counts should be equal, since there wasn't any intervening traffic on TEST_PRODUCE_CONSUME_TOPIC.
+ BytesAndMessagesCount prodCounts1 = rgs.getRGUsage(rgName, ResourceGroupMonitoringClass.Publish,
+ ResourceGroupUsageStatsType.Cumulative);
+ BytesAndMessagesCount consCounts1 = rgs.getRGUsage(rgName, ResourceGroupMonitoringClass.Dispatch,
+ ResourceGroupUsageStatsType.Cumulative);
+
+ Assert.assertEquals(prodCounts1.bytes, prodCounts.bytes);
+ Assert.assertEquals(prodCounts1.messages, prodCounts.messages);
+ Assert.assertEquals(consCounts1.bytes, consCounts.bytes);
+ Assert.assertEquals(consCounts1.messages, consCounts.messages);
+
+ if (checkProduce) {
+ Assert.assertTrue(prodCounts.bytes >= sentNumBytes);
+ Assert.assertEquals(sentNumMsgs, prodCounts.messages);
+ }
+ if (checkConsume) {
+ Assert.assertTrue(consCounts.bytes >= recvdNumBytes);
+ Assert.assertEquals(recvdNumMsgs, consCounts.messages);
}
}
}