You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/08/09 09:32:34 UTC
[pulsar] branch master updated: [ManagedLedger] Make
'StatsPeroidSeconds' configurable (#11584)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ddeae65 [ManagedLedger] Make 'StatsPeroidSeconds' configurable (#11584)
ddeae65 is described below
commit ddeae659be1f80f428fdb8c82c7a9a0c931c81fe
Author: GuoJiwei <te...@apache.org>
AuthorDate: Mon Aug 9 17:31:53 2021 +0800
[ManagedLedger] Make 'StatsPeroidSeconds' configurable (#11584)
### Motivation
Make `StatsPeroidSeconds` configurable.
### Modifications
- Move ‘StatsPeriodSeconds’ from ManagedLedgerFactoryImpl to ManagedLedgerFactoryConfig.
- Add config `managedLedgerStatsPeriodSeconds`.
---
conf/broker.conf | 5 ++++-
.../bookkeeper/mledger/ManagedLedgerFactoryConfig.java | 15 ++++++++++-----
.../bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java | 4 +---
.../org/apache/pulsar/broker/ServiceConfiguration.java | 5 +++++
.../apache/pulsar/broker/ManagedLedgerClientFactory.java | 1 +
.../pulsar/broker/stats/metrics/ManagedLedgerMetrics.java | 12 ++++++++----
6 files changed, 29 insertions(+), 13 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index 63324c2..e4c15c7 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -869,7 +869,10 @@ managedLedgerDefaultAckQuorum=2
# How frequently to flush the cursor positions that were accumulated due to rate limiting. (seconds).
# Default is 60 seconds
-managedLedgerCursorPositionFlushSeconds = 60
+managedLedgerCursorPositionFlushSeconds=60
+
+# How frequently to refresh the stats. (seconds). Default is 60 seconds
+managedLedgerStatsPeriodSeconds=60
# Default type of checksum to use when writing to BookKeeper. Default is "CRC32C"
# Other possible options are "CRC32", "MAC" or "DUMMY" (no checksum).
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
index ef92957..a00c161 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
@@ -48,22 +48,22 @@ public class ManagedLedgerFactoryConfig {
private double cacheEvictionFrequency = 100;
/**
- * All entries that have stayed in cache for more than the configured time, will be evicted
+ * All entries that have stayed in cache for more than the configured time, will be evicted.
*/
private long cacheEvictionTimeThresholdMillis = 1000;
/**
- * Whether we should make a copy of the entry payloads when inserting in cache
+ * Whether we should make a copy of the entry payloads when inserting in cache.
*/
private boolean copyEntriesInCache = false;
/**
- * Whether trace managed ledger task execution time
+ * Whether trace managed ledger task execution time.
*/
private boolean traceTaskExecution = true;
/**
- * Managed ledger prometheus stats Latency Rollover Seconds
+ * Managed ledger prometheus stats Latency Rollover Seconds.
*/
private int prometheusStatsLatencyRolloverSeconds = 60;
@@ -73,7 +73,12 @@ public class ManagedLedgerFactoryConfig {
private int cursorPositionFlushSeconds = 60;
/**
- * cluster name for prometheus stats
+ * How frequently to refresh the stats.
+ */
+ private int statsPeriodSeconds = 60;
+
+ /**
+ * cluster name for prometheus stats.
*/
private String clusterName;
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index 16d577a..03eab6f 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -102,8 +102,6 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
private final long cacheEvictionTimeThresholdNanos;
private final MetadataStore metadataStore;
- public static final int StatsPeriodSeconds = 60;
-
private static class PendingInitializeManagedLedger {
private final ManagedLedgerImpl ledger;
@@ -177,7 +175,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
this.mbean = new ManagedLedgerFactoryMBeanImpl(this);
this.entryCacheManager = new EntryCacheManager(this);
this.statsTask = scheduledExecutor.scheduleAtFixedRate(this::refreshStats,
- 0, StatsPeriodSeconds, TimeUnit.SECONDS);
+ 0, config.getStatsPeriodSeconds(), TimeUnit.SECONDS);
this.flushCursorsTask = scheduledExecutor.scheduleAtFixedRate(this::flushCursors,
config.getCursorPositionFlushSeconds(), config.getCursorPositionFlushSeconds(), TimeUnit.SECONDS);
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index e9926f4..aaec952 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1389,6 +1389,11 @@ public class ServiceConfiguration implements PulsarConfiguration {
doc = "How frequently to flush the cursor positions that were accumulated due to rate limiting. (seconds). Default is 60 seconds")
private int managedLedgerCursorPositionFlushSeconds = 60;
+ @FieldContext(minValue = 1,
+ category = CATEGORY_STORAGE_ML,
+ doc = "How frequently to refresh the stats. (seconds). Default is 60 seconds")
+ private int managedLedgerStatsPeriodSeconds = 60;
+
//
//
@FieldContext(
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
index 431cb72..52572cf 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
@@ -70,6 +70,7 @@ public class ManagedLedgerClientFactory implements ManagedLedgerStorage {
managedLedgerFactoryConfig.setTraceTaskExecution(conf.isManagedLedgerTraceTaskExecution());
managedLedgerFactoryConfig.setCursorPositionFlushSeconds(conf.getManagedLedgerCursorPositionFlushSeconds());
managedLedgerFactoryConfig.setManagedLedgerInfoCompressionType(conf.getManagedLedgerInfoCompressionType());
+ managedLedgerFactoryConfig.setStatsPeriodSeconds(conf.getManagedLedgerStatsPeriodSeconds());
Configuration configuration = new ClientConfiguration();
if (conf.isBookkeeperClientExposeStatsToPrometheus()) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerMetrics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerMetrics.java
index 889eb8e..8a9dd4a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerMetrics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerMetrics.java
@@ -46,11 +46,15 @@ public class ManagedLedgerMetrics extends AbstractMetrics {
private static final Buckets
BRK_ML_ENTRYSIZEBUCKETS = new Buckets("brk_ml_EntrySizeBuckets", ENTRY_SIZE_BUCKETS_BYTES);
+ private int statsPeriodSeconds;
+
public ManagedLedgerMetrics(PulsarService pulsar) {
super(pulsar);
this.metricsCollection = Lists.newArrayList();
this.ledgersByDimensionMap = Maps.newHashMap();
this.tempAggregatedMetricsMap = Maps.newHashMap();
+ this.statsPeriodSeconds = ((ManagedLedgerFactoryImpl) pulsar.getManagedLedgerFactory())
+ .getConfig().getStatsPeriodSeconds();
}
@Override
@@ -112,16 +116,16 @@ public class ManagedLedgerMetrics extends AbstractMetrics {
// handle bucket entries initialization here
BRK_ML_ADDENTRYLATENCYBUCKETS.populateBucketEntries(tempAggregatedMetricsMap,
lStats.getAddEntryLatencyBuckets(),
- ManagedLedgerFactoryImpl.StatsPeriodSeconds);
+ statsPeriodSeconds);
BRK_ML_LEDGERADDENTRYLATENCYBUCKETS.populateBucketEntries(tempAggregatedMetricsMap,
lStats.getLedgerAddEntryLatencyBuckets(),
- ManagedLedgerFactoryImpl.StatsPeriodSeconds);
+ statsPeriodSeconds);
BRK_ML_LEDGERSWITCHLATENCYBUCKETS.populateBucketEntries(tempAggregatedMetricsMap,
lStats.getLedgerSwitchLatencyBuckets(),
- ManagedLedgerFactoryImpl.StatsPeriodSeconds);
+ statsPeriodSeconds);
BRK_ML_ENTRYSIZEBUCKETS.populateBucketEntries(tempAggregatedMetricsMap,
lStats.getEntrySizeBuckets(),
- ManagedLedgerFactoryImpl.StatsPeriodSeconds);
+ statsPeriodSeconds);
populateAggregationMapWithSum(tempAggregatedMetricsMap, "brk_ml_MarkDeleteRate",
lStats.getMarkDeleteRate());
}