You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/02/27 06:01:36 UTC

[pulsar] branch master updated: [Metrics] Reduce CPU consumption of metrics creation (#9735)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ff7989  [Metrics] Reduce CPU consumption of metrics creation (#9735)
3ff7989 is described below

commit 3ff798909239ee4c2e31f96721888f4984ce2548
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Sat Feb 27 08:00:44 2021 +0200

    [Metrics] Reduce CPU consumption of metrics creation (#9735)
    
    * [Metrics] Reduce CPU consumption of metrics creation
    
    * Use Map.compute to improve aggregation logic
---
 .../broker/stats/metrics/AbstractMetrics.java      | 72 +++++++++++++---------
 .../broker/stats/metrics/ManagedLedgerMetrics.java | 27 +++++---
 2 files changed, 62 insertions(+), 37 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/AbstractMetrics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/AbstractMetrics.java
index 21b941e..ed6e79f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/AbstractMetrics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/AbstractMetrics.java
@@ -59,6 +59,50 @@ abstract class AbstractMetrics {
         }
     }
 
+    // simple abstract for the buckets, their boundaries and pre-calculated keys
+    // pre-calculating the keys avoids a lot of object allocations during metric collection
+    static class Buckets {
+        private final double[] boundaries;
+        private final String[] bucketKeys;
+
+        Buckets(String metricKey, double[] boundaries) {
+            this.boundaries = boundaries;
+            this.bucketKeys = generateBucketKeys(metricKey, boundaries);
+        }
+
+        private static String[] generateBucketKeys(String mkey, double[] boundaries) {
+            String[] keys = new String[boundaries.length + 1];
+            for (int i = 0; i < boundaries.length + 1; i++) {
+                String bucketKey;
+                double value;
+
+                // example of key : "<metric_key>_0.0_0.5"
+                if (i == 0 && boundaries.length > 0) {
+                    bucketKey = String.format("%s_0.0_%1.1f", mkey, boundaries[i]);
+                } else if (i < boundaries.length) {
+                    bucketKey = String.format("%s_%1.1f_%1.1f", mkey, boundaries[i - 1], boundaries[i]);
+                } else {
+                    bucketKey = String.format("%s_OVERFLOW", mkey);
+                }
+                keys[i] = bucketKey;
+            }
+            return keys;
+        }
+
+        public void populateBucketEntries(Map<String, Double> map, long[] bucketValues, int period) {
+            // bucket values should be one more that the boundaries to have the last element as OVERFLOW
+            if (bucketValues != null && bucketValues.length != boundaries.length + 1) {
+                throw new RuntimeException("Bucket boundary and value array length mismatch");
+            }
+
+            for (int i = 0; i < boundaries.length + 1; i++) {
+                double value = (bucketValues == null) ? 0.0D : ((double) bucketValues[i] / (period > 0 ? period : 1));
+                map.compute(bucketKeys[i], (key, currentValue) -> (currentValue == null ? 0.0d : currentValue) + value);
+            }
+        }
+    }
+
+
     protected final PulsarService pulsar;
 
     abstract List<Metrics> generate();
@@ -169,34 +213,6 @@ abstract class AbstractMetrics {
         return createMetrics(dimensionMap);
     }
 
-    protected void populateBucketEntries(Map<String, Double> map, String mkey, double[] boundaries,
-            long[] bucketValues, int period) {
-
-        // bucket values should be one more that the boundaries to have the last element as OVERFLOW
-        if (bucketValues != null && bucketValues.length != boundaries.length + 1) {
-            throw new RuntimeException("Bucket boundary and value array length mismatch");
-        }
-
-        for (int i = 0; i < boundaries.length + 1; i++) {
-            String bucketKey;
-            double value;
-
-            // example of key : "<metric_key>_0.0_0.5"
-            if (i == 0 && boundaries.length > 0) {
-                bucketKey = String.format("%s_0.0_%1.1f", mkey, boundaries[i]);
-            } else if (i < boundaries.length) {
-                bucketKey = String.format("%s_%1.1f_%1.1f", mkey, boundaries[i - 1], boundaries[i]);
-            } else {
-                bucketKey = String.format("%s_OVERFLOW", mkey);
-            }
-
-            value = (bucketValues == null) ? 0.0D : ((double) bucketValues[i] / (period > 0 ? period : 1));
-
-            Double val = map.getOrDefault(bucketKey, 0.0);
-            map.put(bucketKey, val + value);
-        }
-    }
-
     protected void populateAggregationMap(Map<String, List<Double>> map, String mkey, double value) {
         if (!map.containsKey(mkey)) {
             map.put(mkey, Lists.newArrayList(value));
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerMetrics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerMetrics.java
index 273447c..8e15aff 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerMetrics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerMetrics.java
@@ -35,6 +35,16 @@ public class ManagedLedgerMetrics extends AbstractMetrics {
     private Map<Metrics, List<ManagedLedgerImpl>> ledgersByDimensionMap;
     // temp map to prepare aggregation metrics
     private Map<String, Double> tempAggregatedMetricsMap;
+    private static final Buckets
+            BRK_ML_ADDENTRYLATENCYBUCKETS = new Buckets("brk_ml_AddEntryLatencyBuckets",
+            ENTRY_LATENCY_BUCKETS_MS);
+    private static final Buckets BRK_ML_LEDGERADDENTRYLATENCYBUCKETS = new Buckets(
+            "brk_ml_LedgerAddEntryLatencyBuckets", ENTRY_LATENCY_BUCKETS_MS);
+    private static final Buckets BRK_ML_LEDGERSWITCHLATENCYBUCKETS = new Buckets(
+            "brk_ml_LedgerSwitchLatencyBuckets", ENTRY_LATENCY_BUCKETS_MS);
+
+    private static final Buckets
+            BRK_ML_ENTRYSIZEBUCKETS = new Buckets("brk_ml_EntrySizeBuckets", ENTRY_SIZE_BUCKETS_BYTES);
 
     public ManagedLedgerMetrics(PulsarService pulsar) {
         super(pulsar);
@@ -98,18 +108,17 @@ public class ManagedLedgerMetrics extends AbstractMetrics {
                         (double) lStats.getStoredMessagesSize());
 
                 // handle bucket entries initialization here
-                populateBucketEntries(tempAggregatedMetricsMap, "brk_ml_AddEntryLatencyBuckets",
-                        ENTRY_LATENCY_BUCKETS_MS, lStats.getAddEntryLatencyBuckets(),
+                BRK_ML_ADDENTRYLATENCYBUCKETS.populateBucketEntries(tempAggregatedMetricsMap,
+                        lStats.getAddEntryLatencyBuckets(),
                         ManagedLedgerFactoryImpl.StatsPeriodSeconds);
-                populateBucketEntries(tempAggregatedMetricsMap, "brk_ml_LedgerAddEntryLatencyBuckets",
-                        ENTRY_LATENCY_BUCKETS_MS, lStats.getLedgerAddEntryLatencyBuckets(),
+                BRK_ML_LEDGERADDENTRYLATENCYBUCKETS.populateBucketEntries(tempAggregatedMetricsMap,
+                        lStats.getLedgerAddEntryLatencyBuckets(),
                         ManagedLedgerFactoryImpl.StatsPeriodSeconds);
-                populateBucketEntries(tempAggregatedMetricsMap, "brk_ml_LedgerSwitchLatencyBuckets",
-                        ENTRY_LATENCY_BUCKETS_MS, lStats.getLedgerSwitchLatencyBuckets(),
+                BRK_ML_LEDGERSWITCHLATENCYBUCKETS.populateBucketEntries(tempAggregatedMetricsMap,
+                        lStats.getLedgerSwitchLatencyBuckets(),
                         ManagedLedgerFactoryImpl.StatsPeriodSeconds);
-
-                populateBucketEntries(tempAggregatedMetricsMap, "brk_ml_EntrySizeBuckets",
-                        ENTRY_SIZE_BUCKETS_BYTES, lStats.getEntrySizeBuckets(),
+                BRK_ML_ENTRYSIZEBUCKETS.populateBucketEntries(tempAggregatedMetricsMap,
+                        lStats.getEntrySizeBuckets(),
                         ManagedLedgerFactoryImpl.StatsPeriodSeconds);
                 populateAggregationMapWithSum(tempAggregatedMetricsMap, "brk_ml_MarkDeleteRate",
                         lStats.getMarkDeleteRate());