You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2023/03/12 05:41:04 UTC
[pulsar] branch branch-2.11 updated: [fix][broker] Fix issue where msgRateExpired may not refresh forever (#19759)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new 93b83e8b1d8 [fix][broker] Fix issue where msgRateExpired may not refresh forever (#19759)
93b83e8b1d8 is described below
commit 93b83e8b1d803a2252548d69729c7f81bb1091e9
Author: Masahiro Sakamoto <ma...@yahoo-corp.jp>
AuthorDate: Sun Mar 12 14:39:30 2023 +0900
[fix][broker] Fix issue where msgRateExpired may not refresh forever (#19759)
---
.../service/persistent/PersistentReplicator.java | 2 ++
.../broker/service/persistent/PersistentTopic.java | 1 +
.../client/api/SimpleProducerConsumerStatTest.java | 41 ++++++++++++++++++++++
3 files changed, 44 insertions(+)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index 2e7dbb2fbf1..38a433b7629 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -683,6 +683,8 @@ public class PersistentReplicator extends AbstractReplicator
public void updateRates() {
msgOut.calculateRate();
msgExpired.calculateRate();
+ expiryMonitor.updateRates();
+
stats.msgRateOut = msgOut.getRate();
stats.msgThroughputOut = msgOut.getValueRate();
stats.msgRateExpired = msgExpired.getRate() + expiryMonitor.getMessageExpiryRate();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 4c7b7bb979c..6213db88f80 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1772,6 +1772,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
// Populate subscription specific stats here
topicStatsStream.writePair("msgBacklog",
subscription.getNumberOfEntriesInBacklog(true));
+ subscription.getExpiryMonitor().updateRates();
topicStatsStream.writePair("msgRateExpired", subscription.getExpiredMessageRate());
topicStatsStream.writePair("msgRateOut", subMsgRateOut);
topicStatsStream.writePair("messageAckRate", subMsgAckRate);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
index 0adeca4bcdb..55d2d812186 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
@@ -515,4 +515,45 @@ public class SimpleProducerConsumerStatTest extends ProducerConsumerBase {
log.info("-- Exiting {} test --", methodName);
}
+
+ @Test
+ public void testMsgRateExpired() throws Exception {
+ log.info("-- Starting {} test --", methodName);
+
+ String topicName = "persistent://my-property/tp1/my-ns/" + methodName;
+ String subName = "my-sub";
+ admin.topics().createSubscription(topicName, subName, MessageId.latest);
+
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topicName)
+ .enableBatching(false)
+ .create();
+
+ int numMessages = 100;
+ for (int i = 0; i < numMessages; i++) {
+ String message = "my-message-" + i;
+ producer.send(message.getBytes());
+ }
+
+ Thread.sleep(2000);
+ admin.topics().expireMessages(topicName, subName, 1);
+ pulsar.getBrokerService().updateRates();
+
+ Awaitility.await().ignoreExceptions().timeout(5, TimeUnit.SECONDS)
+ .until(() -> admin.topics().getStats(topicName).getSubscriptions().get(subName).getMsgRateExpired() > 0.001);
+
+ Thread.sleep(2000);
+ pulsar.getBrokerService().updateRates();
+
+ Awaitility.await().ignoreExceptions().timeout(5, TimeUnit.SECONDS)
+ .until(() -> admin.topics().getStats(topicName).getSubscriptions().get(subName).getMsgRateExpired() < 0.001);
+
+ assertEquals(admin.topics().getStats(topicName).getSubscriptions().get(subName).getMsgRateExpired(), 0.0,
+ 0.001);
+ assertEquals(admin.topics().getStats(topicName).getSubscriptions().get(subName).getTotalMsgExpired(),
+ numMessages);
+
+ log.info("-- Exiting {} test --", methodName);
+ }
}