You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/02/27 06:01:36 UTC
[pulsar] branch master updated: [Metrics] Reduce CPU consumption of
metrics creation (#9735)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 3ff7989 [Metrics] Reduce CPU consumption of metrics creation (#9735)
3ff7989 is described below
commit 3ff798909239ee4c2e31f96721888f4984ce2548
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Sat Feb 27 08:00:44 2021 +0200
[Metrics] Reduce CPU consumption of metrics creation (#9735)
* [Metrics] Reduce CPU consumption of metrics creation
* Use Map.compute to improve aggregation logic
---
.../broker/stats/metrics/AbstractMetrics.java | 72 +++++++++++++---------
.../broker/stats/metrics/ManagedLedgerMetrics.java | 27 +++++---
2 files changed, 62 insertions(+), 37 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/AbstractMetrics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/AbstractMetrics.java
index 21b941e..ed6e79f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/AbstractMetrics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/AbstractMetrics.java
@@ -59,6 +59,50 @@ abstract class AbstractMetrics {
}
}
+ // simple abstract for the buckets, their boundaries and pre-calculated keys
+ // pre-calculating the keys avoids a lot of object allocations during metric collection
+ static class Buckets {
+ private final double[] boundaries;
+ private final String[] bucketKeys;
+
+ Buckets(String metricKey, double[] boundaries) {
+ this.boundaries = boundaries;
+ this.bucketKeys = generateBucketKeys(metricKey, boundaries);
+ }
+
+ private static String[] generateBucketKeys(String mkey, double[] boundaries) {
+ String[] keys = new String[boundaries.length + 1];
+ for (int i = 0; i < boundaries.length + 1; i++) {
+ String bucketKey;
+ double value;
+
+ // example of key : "<metric_key>_0.0_0.5"
+ if (i == 0 && boundaries.length > 0) {
+ bucketKey = String.format("%s_0.0_%1.1f", mkey, boundaries[i]);
+ } else if (i < boundaries.length) {
+ bucketKey = String.format("%s_%1.1f_%1.1f", mkey, boundaries[i - 1], boundaries[i]);
+ } else {
+ bucketKey = String.format("%s_OVERFLOW", mkey);
+ }
+ keys[i] = bucketKey;
+ }
+ return keys;
+ }
+
+ public void populateBucketEntries(Map<String, Double> map, long[] bucketValues, int period) {
+ // bucket values should be one more that the boundaries to have the last element as OVERFLOW
+ if (bucketValues != null && bucketValues.length != boundaries.length + 1) {
+ throw new RuntimeException("Bucket boundary and value array length mismatch");
+ }
+
+ for (int i = 0; i < boundaries.length + 1; i++) {
+ double value = (bucketValues == null) ? 0.0D : ((double) bucketValues[i] / (period > 0 ? period : 1));
+ map.compute(bucketKeys[i], (key, currentValue) -> (currentValue == null ? 0.0d : currentValue) + value);
+ }
+ }
+ }
+
+
protected final PulsarService pulsar;
abstract List<Metrics> generate();
@@ -169,34 +213,6 @@ abstract class AbstractMetrics {
return createMetrics(dimensionMap);
}
- protected void populateBucketEntries(Map<String, Double> map, String mkey, double[] boundaries,
- long[] bucketValues, int period) {
-
- // bucket values should be one more that the boundaries to have the last element as OVERFLOW
- if (bucketValues != null && bucketValues.length != boundaries.length + 1) {
- throw new RuntimeException("Bucket boundary and value array length mismatch");
- }
-
- for (int i = 0; i < boundaries.length + 1; i++) {
- String bucketKey;
- double value;
-
- // example of key : "<metric_key>_0.0_0.5"
- if (i == 0 && boundaries.length > 0) {
- bucketKey = String.format("%s_0.0_%1.1f", mkey, boundaries[i]);
- } else if (i < boundaries.length) {
- bucketKey = String.format("%s_%1.1f_%1.1f", mkey, boundaries[i - 1], boundaries[i]);
- } else {
- bucketKey = String.format("%s_OVERFLOW", mkey);
- }
-
- value = (bucketValues == null) ? 0.0D : ((double) bucketValues[i] / (period > 0 ? period : 1));
-
- Double val = map.getOrDefault(bucketKey, 0.0);
- map.put(bucketKey, val + value);
- }
- }
-
protected void populateAggregationMap(Map<String, List<Double>> map, String mkey, double value) {
if (!map.containsKey(mkey)) {
map.put(mkey, Lists.newArrayList(value));
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerMetrics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerMetrics.java
index 273447c..8e15aff 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerMetrics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerMetrics.java
@@ -35,6 +35,16 @@ public class ManagedLedgerMetrics extends AbstractMetrics {
private Map<Metrics, List<ManagedLedgerImpl>> ledgersByDimensionMap;
// temp map to prepare aggregation metrics
private Map<String, Double> tempAggregatedMetricsMap;
+ private static final Buckets
+ BRK_ML_ADDENTRYLATENCYBUCKETS = new Buckets("brk_ml_AddEntryLatencyBuckets",
+ ENTRY_LATENCY_BUCKETS_MS);
+ private static final Buckets BRK_ML_LEDGERADDENTRYLATENCYBUCKETS = new Buckets(
+ "brk_ml_LedgerAddEntryLatencyBuckets", ENTRY_LATENCY_BUCKETS_MS);
+ private static final Buckets BRK_ML_LEDGERSWITCHLATENCYBUCKETS = new Buckets(
+ "brk_ml_LedgerSwitchLatencyBuckets", ENTRY_LATENCY_BUCKETS_MS);
+
+ private static final Buckets
+ BRK_ML_ENTRYSIZEBUCKETS = new Buckets("brk_ml_EntrySizeBuckets", ENTRY_SIZE_BUCKETS_BYTES);
public ManagedLedgerMetrics(PulsarService pulsar) {
super(pulsar);
@@ -98,18 +108,17 @@ public class ManagedLedgerMetrics extends AbstractMetrics {
(double) lStats.getStoredMessagesSize());
// handle bucket entries initialization here
- populateBucketEntries(tempAggregatedMetricsMap, "brk_ml_AddEntryLatencyBuckets",
- ENTRY_LATENCY_BUCKETS_MS, lStats.getAddEntryLatencyBuckets(),
+ BRK_ML_ADDENTRYLATENCYBUCKETS.populateBucketEntries(tempAggregatedMetricsMap,
+ lStats.getAddEntryLatencyBuckets(),
ManagedLedgerFactoryImpl.StatsPeriodSeconds);
- populateBucketEntries(tempAggregatedMetricsMap, "brk_ml_LedgerAddEntryLatencyBuckets",
- ENTRY_LATENCY_BUCKETS_MS, lStats.getLedgerAddEntryLatencyBuckets(),
+ BRK_ML_LEDGERADDENTRYLATENCYBUCKETS.populateBucketEntries(tempAggregatedMetricsMap,
+ lStats.getLedgerAddEntryLatencyBuckets(),
ManagedLedgerFactoryImpl.StatsPeriodSeconds);
- populateBucketEntries(tempAggregatedMetricsMap, "brk_ml_LedgerSwitchLatencyBuckets",
- ENTRY_LATENCY_BUCKETS_MS, lStats.getLedgerSwitchLatencyBuckets(),
+ BRK_ML_LEDGERSWITCHLATENCYBUCKETS.populateBucketEntries(tempAggregatedMetricsMap,
+ lStats.getLedgerSwitchLatencyBuckets(),
ManagedLedgerFactoryImpl.StatsPeriodSeconds);
-
- populateBucketEntries(tempAggregatedMetricsMap, "brk_ml_EntrySizeBuckets",
- ENTRY_SIZE_BUCKETS_BYTES, lStats.getEntrySizeBuckets(),
+ BRK_ML_ENTRYSIZEBUCKETS.populateBucketEntries(tempAggregatedMetricsMap,
+ lStats.getEntrySizeBuckets(),
ManagedLedgerFactoryImpl.StatsPeriodSeconds);
populateAggregationMapWithSum(tempAggregatedMetricsMap, "brk_ml_MarkDeleteRate",
lStats.getMarkDeleteRate());