You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2023/10/14 10:31:35 UTC

[pulsar] branch master updated: [improve][broker] Optimize and clean up aggregation of topic stats (#21361)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d6a56ad3094 [improve][broker] Optimize and clean up aggregation of topic stats (#21361)
d6a56ad3094 is described below

commit d6a56ad3094e9da89fa5f7cf00e0b7bbb36e6563
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Sat Oct 14 13:31:27 2023 +0300

    [improve][broker] Optimize and clean up aggregation of topic stats (#21361)
---
 .../data/stats/NonPersistentTopicStatsImpl.java    | 57 +++++++---------------
 .../common/policies/data/stats/TopicStatsImpl.java | 49 +++++++------------
 2 files changed, 35 insertions(+), 71 deletions(-)

diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentTopicStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentTopicStatsImpl.java
index fd643f0db7b..7710c27779b 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentTopicStatsImpl.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentTopicStatsImpl.java
@@ -155,8 +155,9 @@ public class NonPersistentTopicStatsImpl extends TopicStatsImpl implements NonPe
         Objects.requireNonNull(stats);
         super.add(stats);
         this.msgDropRate += stats.msgDropRate;
-        for (int index = 0; index < stats.getNonPersistentPublishers().size(); index++) {
-            NonPersistentPublisherStats s = stats.getNonPersistentPublishers().get(index);
+        List<NonPersistentPublisherStats> publisherStats = stats.getNonPersistentPublishers();
+        for (int index = 0; index < publisherStats.size(); index++) {
+            NonPersistentPublisherStats s = publisherStats.get(index);
             if (s.isSupportsPartialProducer() && s.getProducerName() != null) {
                 ((NonPersistentPublisherStatsImpl) this.nonPersistentPublishersMap
                         .computeIfAbsent(s.getProducerName(), key -> {
@@ -181,46 +182,24 @@ public class NonPersistentTopicStatsImpl extends TopicStatsImpl implements NonPe
             }
         }
 
-        if (this.getNonPersistentSubscriptions().size() != stats.getNonPersistentSubscriptions().size()) {
-            for (String subscription : stats.getNonPersistentSubscriptions().keySet()) {
-                NonPersistentSubscriptionStatsImpl subscriptionStats = new NonPersistentSubscriptionStatsImpl();
-                this.getNonPersistentSubscriptions().put(subscription, subscriptionStats
-                        .add((NonPersistentSubscriptionStatsImpl)
-                                stats.getNonPersistentSubscriptions().get(subscription)));
-            }
-        } else {
-            for (String subscription : stats.getNonPersistentSubscriptions().keySet()) {
-                if (this.getNonPersistentSubscriptions().get(subscription) != null) {
-                    ((NonPersistentSubscriptionStatsImpl) this.getNonPersistentSubscriptions().get(subscription))
-                          .add((NonPersistentSubscriptionStatsImpl)
-                                  stats.getNonPersistentSubscriptions().get(subscription));
-                } else {
-                    NonPersistentSubscriptionStatsImpl subscriptionStats = new NonPersistentSubscriptionStatsImpl();
-                    this.getNonPersistentSubscriptions().put(subscription, subscriptionStats
-                         .add((NonPersistentSubscriptionStatsImpl)
-                                 stats.getNonPersistentSubscriptions().get(subscription)));
-                }
-            }
+        for (Map.Entry<String, NonPersistentSubscriptionStats> entry : stats.getNonPersistentSubscriptions()
+                .entrySet()) {
+            NonPersistentSubscriptionStatsImpl subscriptionStats =
+                    (NonPersistentSubscriptionStatsImpl) this.getNonPersistentSubscriptions()
+                            .computeIfAbsent(entry.getKey(), k -> new NonPersistentSubscriptionStatsImpl());
+            subscriptionStats.add(
+                    (NonPersistentSubscriptionStatsImpl) entry.getValue());
         }
 
-        if (this.getNonPersistentReplicators().size() != stats.getNonPersistentReplicators().size()) {
-            for (String repl : stats.getNonPersistentReplicators().keySet()) {
-                NonPersistentReplicatorStatsImpl replStats = new NonPersistentReplicatorStatsImpl();
-                this.getNonPersistentReplicators().put(repl, replStats
-                        .add((NonPersistentReplicatorStatsImpl) stats.getNonPersistentReplicators().get(repl)));
-            }
-        } else {
-            for (String repl : stats.getNonPersistentReplicators().keySet()) {
-                if (this.getNonPersistentReplicators().get(repl) != null) {
-                    ((NonPersistentReplicatorStatsImpl) this.getNonPersistentReplicators().get(repl))
-                            .add((NonPersistentReplicatorStatsImpl) stats.getNonPersistentReplicators().get(repl));
-                } else {
-                    NonPersistentReplicatorStatsImpl replStats = new NonPersistentReplicatorStatsImpl();
-                    this.getNonPersistentReplicators().put(repl, replStats
-                            .add((NonPersistentReplicatorStatsImpl) stats.getNonPersistentReplicators().get(repl)));
-                }
-            }
+        for (Map.Entry<String, NonPersistentReplicatorStats> entry : stats.getNonPersistentReplicators().entrySet()) {
+            NonPersistentReplicatorStatsImpl replStats = (NonPersistentReplicatorStatsImpl)
+                    this.getNonPersistentReplicators().computeIfAbsent(entry.getKey(), k -> {
+                        NonPersistentReplicatorStatsImpl r = new NonPersistentReplicatorStatsImpl();
+                        return r;
+                    });
+            replStats.add((NonPersistentReplicatorStatsImpl) entry.getValue());
         }
+
         return this;
     }
 
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
index e50620fb223..e022c885d66 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
@@ -259,8 +259,9 @@ public class TopicStatsImpl implements TopicStats {
             topicMetricBean.value += v.value;
         });
 
-        for (int index = 0; index < stats.getPublishers().size(); index++) {
-           PublisherStats s = stats.getPublishers().get(index);
+        List<? extends PublisherStats> publisherStats = stats.getPublishers();
+        for (int index = 0; index < publisherStats.size(); index++) {
+           PublisherStats s = publisherStats.get(index);
            if (s.isSupportsPartialProducer() && s.getProducerName() != null) {
                this.publishersMap.computeIfAbsent(s.getProducerName(), key -> {
                    final PublisherStatsImpl newStats = new PublisherStatsImpl();
@@ -284,38 +285,22 @@ public class TopicStatsImpl implements TopicStats {
            }
         }
 
-        if (this.subscriptions.size() != stats.subscriptions.size()) {
-            for (String subscription : stats.subscriptions.keySet()) {
-                SubscriptionStatsImpl subscriptionStats = new SubscriptionStatsImpl();
-                this.subscriptions.put(subscription, subscriptionStats.add(stats.subscriptions.get(subscription)));
-            }
-        } else {
-            for (String subscription : stats.subscriptions.keySet()) {
-                if (this.subscriptions.get(subscription) != null) {
-                    this.subscriptions.get(subscription).add(stats.subscriptions.get(subscription));
-                } else {
-                    SubscriptionStatsImpl subscriptionStats = new SubscriptionStatsImpl();
-                    this.subscriptions.put(subscription, subscriptionStats.add(stats.subscriptions.get(subscription)));
-                }
-            }
+        for (Map.Entry<String, SubscriptionStatsImpl> entry : stats.subscriptions.entrySet()) {
+            SubscriptionStatsImpl subscriptionStats =
+                    this.subscriptions.computeIfAbsent(entry.getKey(), k -> new SubscriptionStatsImpl());
+            subscriptionStats.add(entry.getValue());
         }
-        if (this.replication.size() != stats.replication.size()) {
-            for (String repl : stats.replication.keySet()) {
-                ReplicatorStatsImpl replStats = new ReplicatorStatsImpl();
-                replStats.setConnected(true);
-                this.replication.put(repl, replStats.add(stats.replication.get(repl)));
-            }
-        } else {
-            for (String repl : stats.replication.keySet()) {
-                if (this.replication.get(repl) != null) {
-                    this.replication.get(repl).add(stats.replication.get(repl));
-                } else {
-                    ReplicatorStatsImpl replStats = new ReplicatorStatsImpl();
-                    replStats.setConnected(true);
-                    this.replication.put(repl, replStats.add(stats.replication.get(repl)));
-                }
-            }
+
+        for (Map.Entry<String, ReplicatorStatsImpl> entry : stats.replication.entrySet()) {
+            ReplicatorStatsImpl replStats =
+                    this.replication.computeIfAbsent(entry.getKey(), k -> {
+                        ReplicatorStatsImpl r = new ReplicatorStatsImpl();
+                        r.setConnected(true);
+                        return r;
+                    });
+            replStats.add(entry.getValue());
         }
+
         if (earliestMsgPublishTimeInBacklogs != 0 && ((TopicStatsImpl) ts).earliestMsgPublishTimeInBacklogs != 0) {
             earliestMsgPublishTimeInBacklogs = Math.min(
                     earliestMsgPublishTimeInBacklogs,