You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/08/09 09:32:34 UTC

[pulsar] branch master updated: [ManagedLedger] Make 'StatsPeroidSeconds' configurable (#11584)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ddeae65  [ManagedLedger] Make 'StatsPeroidSeconds' configurable (#11584)
ddeae65 is described below

commit ddeae659be1f80f428fdb8c82c7a9a0c931c81fe
Author: GuoJiwei <te...@apache.org>
AuthorDate: Mon Aug 9 17:31:53 2021 +0800

    [ManagedLedger] Make 'StatsPeroidSeconds' configurable (#11584)
    
    ### Motivation
    Make `StatsPeroidSeconds` configurable.
    
    ### Modifications
    - Move ‘StatsPeriodSeconds’ from ManagedLedgerFactoryImpl to ManagedLedgerFactoryConfig.
    - Add config `managedLedgerStatsPeriodSeconds`.
---
 conf/broker.conf                                          |  5 ++++-
 .../bookkeeper/mledger/ManagedLedgerFactoryConfig.java    | 15 ++++++++++-----
 .../bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java |  4 +---
 .../org/apache/pulsar/broker/ServiceConfiguration.java    |  5 +++++
 .../apache/pulsar/broker/ManagedLedgerClientFactory.java  |  1 +
 .../pulsar/broker/stats/metrics/ManagedLedgerMetrics.java | 12 ++++++++----
 6 files changed, 29 insertions(+), 13 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 63324c2..e4c15c7 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -869,7 +869,10 @@ managedLedgerDefaultAckQuorum=2
 
 # How frequently to flush the cursor positions that were accumulated due to rate limiting. (seconds).
 # Default is 60 seconds
-managedLedgerCursorPositionFlushSeconds = 60
+managedLedgerCursorPositionFlushSeconds=60
+
+# How frequently to refresh the stats. (seconds). Default is 60 seconds
+managedLedgerStatsPeriodSeconds=60
 
 # Default type of checksum to use when writing to BookKeeper. Default is "CRC32C"
 # Other possible options are "CRC32", "MAC" or "DUMMY" (no checksum).
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
index ef92957..a00c161 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
@@ -48,22 +48,22 @@ public class ManagedLedgerFactoryConfig {
     private double cacheEvictionFrequency = 100;
 
     /**
-     * All entries that have stayed in cache for more than the configured time, will be evicted
+     * All entries that have stayed in cache for more than the configured time, will be evicted.
      */
     private long cacheEvictionTimeThresholdMillis = 1000;
 
     /**
-     * Whether we should make a copy of the entry payloads when inserting in cache
+     * Whether we should make a copy of the entry payloads when inserting in cache.
      */
     private boolean copyEntriesInCache = false;
 
     /**
-     * Whether trace managed ledger task execution time
+     * Whether trace managed ledger task execution time.
      */
     private boolean traceTaskExecution = true;
 
     /**
-     * Managed ledger prometheus stats Latency Rollover Seconds
+     * Managed ledger prometheus stats Latency Rollover Seconds.
      */
     private int prometheusStatsLatencyRolloverSeconds = 60;
 
@@ -73,7 +73,12 @@ public class ManagedLedgerFactoryConfig {
     private int cursorPositionFlushSeconds = 60;
 
     /**
-     * cluster name for prometheus stats
+     * How frequently to refresh the stats.
+     */
+    private int statsPeriodSeconds = 60;
+
+    /**
+     * cluster name for prometheus stats.
      */
     private String clusterName;
 
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index 16d577a..03eab6f 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -102,8 +102,6 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
     private final long cacheEvictionTimeThresholdNanos;
     private final MetadataStore metadataStore;
 
-    public static final int StatsPeriodSeconds = 60;
-
     private static class PendingInitializeManagedLedger {
 
         private final ManagedLedgerImpl ledger;
@@ -177,7 +175,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
         this.mbean = new ManagedLedgerFactoryMBeanImpl(this);
         this.entryCacheManager = new EntryCacheManager(this);
         this.statsTask = scheduledExecutor.scheduleAtFixedRate(this::refreshStats,
-                0, StatsPeriodSeconds, TimeUnit.SECONDS);
+                0, config.getStatsPeriodSeconds(), TimeUnit.SECONDS);
         this.flushCursorsTask = scheduledExecutor.scheduleAtFixedRate(this::flushCursors,
                 config.getCursorPositionFlushSeconds(), config.getCursorPositionFlushSeconds(), TimeUnit.SECONDS);
 
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index e9926f4..aaec952 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1389,6 +1389,11 @@ public class ServiceConfiguration implements PulsarConfiguration {
             doc = "How frequently to flush the cursor positions that were accumulated due to rate limiting. (seconds). Default is 60 seconds")
     private int managedLedgerCursorPositionFlushSeconds = 60;
 
+    @FieldContext(minValue = 1,
+            category = CATEGORY_STORAGE_ML,
+            doc = "How frequently to refresh the stats. (seconds). Default is 60 seconds")
+    private int managedLedgerStatsPeriodSeconds = 60;
+
     //
     //
     @FieldContext(
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
index 431cb72..52572cf 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
@@ -70,6 +70,7 @@ public class ManagedLedgerClientFactory implements ManagedLedgerStorage {
         managedLedgerFactoryConfig.setTraceTaskExecution(conf.isManagedLedgerTraceTaskExecution());
         managedLedgerFactoryConfig.setCursorPositionFlushSeconds(conf.getManagedLedgerCursorPositionFlushSeconds());
         managedLedgerFactoryConfig.setManagedLedgerInfoCompressionType(conf.getManagedLedgerInfoCompressionType());
+        managedLedgerFactoryConfig.setStatsPeriodSeconds(conf.getManagedLedgerStatsPeriodSeconds());
 
         Configuration configuration = new ClientConfiguration();
         if (conf.isBookkeeperClientExposeStatsToPrometheus()) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerMetrics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerMetrics.java
index 889eb8e..8a9dd4a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerMetrics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerMetrics.java
@@ -46,11 +46,15 @@ public class ManagedLedgerMetrics extends AbstractMetrics {
     private static final Buckets
             BRK_ML_ENTRYSIZEBUCKETS = new Buckets("brk_ml_EntrySizeBuckets", ENTRY_SIZE_BUCKETS_BYTES);
 
+    private int statsPeriodSeconds;
+
     public ManagedLedgerMetrics(PulsarService pulsar) {
         super(pulsar);
         this.metricsCollection = Lists.newArrayList();
         this.ledgersByDimensionMap = Maps.newHashMap();
         this.tempAggregatedMetricsMap = Maps.newHashMap();
+        this.statsPeriodSeconds = ((ManagedLedgerFactoryImpl) pulsar.getManagedLedgerFactory())
+                .getConfig().getStatsPeriodSeconds();
     }
 
     @Override
@@ -112,16 +116,16 @@ public class ManagedLedgerMetrics extends AbstractMetrics {
                 // handle bucket entries initialization here
                 BRK_ML_ADDENTRYLATENCYBUCKETS.populateBucketEntries(tempAggregatedMetricsMap,
                         lStats.getAddEntryLatencyBuckets(),
-                        ManagedLedgerFactoryImpl.StatsPeriodSeconds);
+                        statsPeriodSeconds);
                 BRK_ML_LEDGERADDENTRYLATENCYBUCKETS.populateBucketEntries(tempAggregatedMetricsMap,
                         lStats.getLedgerAddEntryLatencyBuckets(),
-                        ManagedLedgerFactoryImpl.StatsPeriodSeconds);
+                        statsPeriodSeconds);
                 BRK_ML_LEDGERSWITCHLATENCYBUCKETS.populateBucketEntries(tempAggregatedMetricsMap,
                         lStats.getLedgerSwitchLatencyBuckets(),
-                        ManagedLedgerFactoryImpl.StatsPeriodSeconds);
+                        statsPeriodSeconds);
                 BRK_ML_ENTRYSIZEBUCKETS.populateBucketEntries(tempAggregatedMetricsMap,
                         lStats.getEntrySizeBuckets(),
-                        ManagedLedgerFactoryImpl.StatsPeriodSeconds);
+                        statsPeriodSeconds);
                 populateAggregationMapWithSum(tempAggregatedMetricsMap, "brk_ml_MarkDeleteRate",
                         lStats.getMarkDeleteRate());
             }