You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2023/10/14 10:31:35 UTC
[pulsar] branch master updated: [improve][broker] Optimize and clean up aggregation of topic stats (#21361)
This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d6a56ad3094 [improve][broker] Optimize and clean up aggregation of topic stats (#21361)
d6a56ad3094 is described below
commit d6a56ad3094e9da89fa5f7cf00e0b7bbb36e6563
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Sat Oct 14 13:31:27 2023 +0300
[improve][broker] Optimize and clean up aggregation of topic stats (#21361)
---
.../data/stats/NonPersistentTopicStatsImpl.java | 57 +++++++---------------
.../common/policies/data/stats/TopicStatsImpl.java | 49 +++++++------------
2 files changed, 35 insertions(+), 71 deletions(-)
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentTopicStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentTopicStatsImpl.java
index fd643f0db7b..7710c27779b 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentTopicStatsImpl.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentTopicStatsImpl.java
@@ -155,8 +155,9 @@ public class NonPersistentTopicStatsImpl extends TopicStatsImpl implements NonPe
Objects.requireNonNull(stats);
super.add(stats);
this.msgDropRate += stats.msgDropRate;
- for (int index = 0; index < stats.getNonPersistentPublishers().size(); index++) {
- NonPersistentPublisherStats s = stats.getNonPersistentPublishers().get(index);
+ List<NonPersistentPublisherStats> publisherStats = stats.getNonPersistentPublishers();
+ for (int index = 0; index < publisherStats.size(); index++) {
+ NonPersistentPublisherStats s = publisherStats.get(index);
if (s.isSupportsPartialProducer() && s.getProducerName() != null) {
((NonPersistentPublisherStatsImpl) this.nonPersistentPublishersMap
.computeIfAbsent(s.getProducerName(), key -> {
@@ -181,46 +182,24 @@ public class NonPersistentTopicStatsImpl extends TopicStatsImpl implements NonPe
}
}
- if (this.getNonPersistentSubscriptions().size() != stats.getNonPersistentSubscriptions().size()) {
- for (String subscription : stats.getNonPersistentSubscriptions().keySet()) {
- NonPersistentSubscriptionStatsImpl subscriptionStats = new NonPersistentSubscriptionStatsImpl();
- this.getNonPersistentSubscriptions().put(subscription, subscriptionStats
- .add((NonPersistentSubscriptionStatsImpl)
- stats.getNonPersistentSubscriptions().get(subscription)));
- }
- } else {
- for (String subscription : stats.getNonPersistentSubscriptions().keySet()) {
- if (this.getNonPersistentSubscriptions().get(subscription) != null) {
- ((NonPersistentSubscriptionStatsImpl) this.getNonPersistentSubscriptions().get(subscription))
- .add((NonPersistentSubscriptionStatsImpl)
- stats.getNonPersistentSubscriptions().get(subscription));
- } else {
- NonPersistentSubscriptionStatsImpl subscriptionStats = new NonPersistentSubscriptionStatsImpl();
- this.getNonPersistentSubscriptions().put(subscription, subscriptionStats
- .add((NonPersistentSubscriptionStatsImpl)
- stats.getNonPersistentSubscriptions().get(subscription)));
- }
- }
+ for (Map.Entry<String, NonPersistentSubscriptionStats> entry : stats.getNonPersistentSubscriptions()
+ .entrySet()) {
+ NonPersistentSubscriptionStatsImpl subscriptionStats =
+ (NonPersistentSubscriptionStatsImpl) this.getNonPersistentSubscriptions()
+ .computeIfAbsent(entry.getKey(), k -> new NonPersistentSubscriptionStatsImpl());
+ subscriptionStats.add(
+ (NonPersistentSubscriptionStatsImpl) entry.getValue());
}
- if (this.getNonPersistentReplicators().size() != stats.getNonPersistentReplicators().size()) {
- for (String repl : stats.getNonPersistentReplicators().keySet()) {
- NonPersistentReplicatorStatsImpl replStats = new NonPersistentReplicatorStatsImpl();
- this.getNonPersistentReplicators().put(repl, replStats
- .add((NonPersistentReplicatorStatsImpl) stats.getNonPersistentReplicators().get(repl)));
- }
- } else {
- for (String repl : stats.getNonPersistentReplicators().keySet()) {
- if (this.getNonPersistentReplicators().get(repl) != null) {
- ((NonPersistentReplicatorStatsImpl) this.getNonPersistentReplicators().get(repl))
- .add((NonPersistentReplicatorStatsImpl) stats.getNonPersistentReplicators().get(repl));
- } else {
- NonPersistentReplicatorStatsImpl replStats = new NonPersistentReplicatorStatsImpl();
- this.getNonPersistentReplicators().put(repl, replStats
- .add((NonPersistentReplicatorStatsImpl) stats.getNonPersistentReplicators().get(repl)));
- }
- }
+ for (Map.Entry<String, NonPersistentReplicatorStats> entry : stats.getNonPersistentReplicators().entrySet()) {
+ NonPersistentReplicatorStatsImpl replStats = (NonPersistentReplicatorStatsImpl)
+ this.getNonPersistentReplicators().computeIfAbsent(entry.getKey(), k -> {
+ NonPersistentReplicatorStatsImpl r = new NonPersistentReplicatorStatsImpl();
+ return r;
+ });
+ replStats.add((NonPersistentReplicatorStatsImpl) entry.getValue());
}
+
return this;
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
index e50620fb223..e022c885d66 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
@@ -259,8 +259,9 @@ public class TopicStatsImpl implements TopicStats {
topicMetricBean.value += v.value;
});
- for (int index = 0; index < stats.getPublishers().size(); index++) {
- PublisherStats s = stats.getPublishers().get(index);
+ List<? extends PublisherStats> publisherStats = stats.getPublishers();
+ for (int index = 0; index < publisherStats.size(); index++) {
+ PublisherStats s = publisherStats.get(index);
if (s.isSupportsPartialProducer() && s.getProducerName() != null) {
this.publishersMap.computeIfAbsent(s.getProducerName(), key -> {
final PublisherStatsImpl newStats = new PublisherStatsImpl();
@@ -284,38 +285,22 @@ public class TopicStatsImpl implements TopicStats {
}
}
- if (this.subscriptions.size() != stats.subscriptions.size()) {
- for (String subscription : stats.subscriptions.keySet()) {
- SubscriptionStatsImpl subscriptionStats = new SubscriptionStatsImpl();
- this.subscriptions.put(subscription, subscriptionStats.add(stats.subscriptions.get(subscription)));
- }
- } else {
- for (String subscription : stats.subscriptions.keySet()) {
- if (this.subscriptions.get(subscription) != null) {
- this.subscriptions.get(subscription).add(stats.subscriptions.get(subscription));
- } else {
- SubscriptionStatsImpl subscriptionStats = new SubscriptionStatsImpl();
- this.subscriptions.put(subscription, subscriptionStats.add(stats.subscriptions.get(subscription)));
- }
- }
+ for (Map.Entry<String, SubscriptionStatsImpl> entry : stats.subscriptions.entrySet()) {
+ SubscriptionStatsImpl subscriptionStats =
+ this.subscriptions.computeIfAbsent(entry.getKey(), k -> new SubscriptionStatsImpl());
+ subscriptionStats.add(entry.getValue());
}
- if (this.replication.size() != stats.replication.size()) {
- for (String repl : stats.replication.keySet()) {
- ReplicatorStatsImpl replStats = new ReplicatorStatsImpl();
- replStats.setConnected(true);
- this.replication.put(repl, replStats.add(stats.replication.get(repl)));
- }
- } else {
- for (String repl : stats.replication.keySet()) {
- if (this.replication.get(repl) != null) {
- this.replication.get(repl).add(stats.replication.get(repl));
- } else {
- ReplicatorStatsImpl replStats = new ReplicatorStatsImpl();
- replStats.setConnected(true);
- this.replication.put(repl, replStats.add(stats.replication.get(repl)));
- }
- }
+
+ for (Map.Entry<String, ReplicatorStatsImpl> entry : stats.replication.entrySet()) {
+ ReplicatorStatsImpl replStats =
+ this.replication.computeIfAbsent(entry.getKey(), k -> {
+ ReplicatorStatsImpl r = new ReplicatorStatsImpl();
+ r.setConnected(true);
+ return r;
+ });
+ replStats.add(entry.getValue());
}
+
if (earliestMsgPublishTimeInBacklogs != 0 && ((TopicStatsImpl) ts).earliestMsgPublishTimeInBacklogs != 0) {
earliestMsgPublishTimeInBacklogs = Math.min(
earliestMsgPublishTimeInBacklogs,