You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2022/07/14 07:07:09 UTC
[pulsar] branch master updated: Offloaders: fix metrics (#16405)
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ac3f3eee601 Offloaders: fix metrics (#16405)
ac3f3eee601 is described below
commit ac3f3eee601fb872ddb44d52642e1d31f51e930d
Author: Enrico Olivelli <eo...@apache.org>
AuthorDate: Thu Jul 14 09:06:58 2022 +0200
Offloaders: fix metrics (#16405)
---
.../mledger/impl/LedgerOffloaderStatsImpl.java | 24 +++++++++++++++++++---
.../org/apache/pulsar/broker/PulsarService.java | 19 ++++++++++++-----
.../impl/BlobStoreBackedInputStreamImpl.java | 14 ++++++++-----
3 files changed, 44 insertions(+), 13 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerOffloaderStatsImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerOffloaderStatsImpl.java
index 64b0d125162..72382b1ac45 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerOffloaderStatsImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerOffloaderStatsImpl.java
@@ -54,6 +54,7 @@ public final class LedgerOffloaderStatsImpl implements LedgerOffloaderStats, Run
private final Summary readLedgerLatency;
private final Counter writeStorageError;
private final Counter readOffloadError;
+ private final Counter readOffloadBytes;
private final Gauge readOffloadRate;
private final Summary readOffloadIndexLatency;
private final Summary readOffloadDataLatency;
@@ -86,15 +87,30 @@ public final class LedgerOffloaderStatsImpl implements LedgerOffloaderStats, Run
.labelNames(labels).create().register();
this.readOffloadRate = Gauge.build("brk_ledgeroffloader_read_offload_rate", "-")
.labelNames(labels).create().register();
+ this.readOffloadBytes = Counter.build("brk_ledgeroffloader_read_bytes", "-")
+ .labelNames(labels).create().register();
this.writeStorageError = Counter.build("brk_ledgeroffloader_write_storage_error", "-")
.labelNames(labels).create().register();
this.readOffloadIndexLatency = Summary.build("brk_ledgeroffloader_read_offload_index_latency", "-")
- .labelNames(labels).create().register();
+ .labelNames(labels).quantile(0.50, 0.01)
+ .quantile(0.95, 0.01)
+ .quantile(0.99, 0.01)
+ .quantile(1, 0.01)
+ .create().register();
this.readOffloadDataLatency = Summary.build("brk_ledgeroffloader_read_offload_data_latency", "-")
- .labelNames(labels).create().register();
+ .labelNames(labels)
+ .quantile(0.50, 0.01)
+ .quantile(0.95, 0.01)
+ .quantile(0.99, 0.01)
+ .quantile(1, 0.01)
+ .create().register();
this.readLedgerLatency = Summary.build("brk_ledgeroffloader_read_ledger_latency", "-")
- .labelNames(labels).create().register();
+ .labelNames(labels).quantile(0.50, 0.01)
+ .quantile(0.95, 0.01)
+ .quantile(0.99, 0.01)
+ .quantile(1, 0.01)
+ .create().register();
String[] deleteOpsLabels = exposeTopicLevelMetrics
? new String[]{NAMESPACE_LABEL, TOPIC_LABEL, STATUS} : new String[]{NAMESPACE_LABEL, STATUS};
@@ -156,6 +172,8 @@ public final class LedgerOffloaderStatsImpl implements LedgerOffloaderStats, Run
Pair<LongAdder, LongAdder> pair = this.offloadAndReadOffloadBytesMap
.computeIfAbsent(topic, __ -> new ImmutablePair<>(new LongAdder(), new LongAdder()));
pair.getRight().add(size);
+ String[] labelValues = this.labelValues(topic);
+ this.readOffloadBytes.labels(labelValues).inc(size);
this.addOrUpdateTopicAccess(topic);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index ea7b4481b23..a79499438f5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -209,7 +209,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
private OrderedScheduler offloaderScheduler;
private OffloadersCache offloadersCache = new OffloadersCache();
private LedgerOffloader defaultOffloader;
- private final LedgerOffloaderStats offloaderStats;
+ private LedgerOffloaderStats offloaderStats;
private Map<NamespaceName, LedgerOffloader> ledgerOffloaderMap = new ConcurrentHashMap<>();
private ScheduledFuture<?> loadReportTask = null;
private ScheduledFuture<?> loadSheddingTask = null;
@@ -346,8 +346,8 @@ public class PulsarService implements AutoCloseable, ShutdownService {
int interval = config.getManagedLedgerStatsPeriodSeconds();
boolean exposeTopicMetrics = config.isExposeTopicLevelMetricsInPrometheus();
- this.offloaderStats = LedgerOffloaderStats.create(config.isExposeManagedLedgerMetricsInPrometheus(),
- exposeTopicMetrics, this.getOffloaderScheduler(), interval);
+ // here in the constructor we don't have the offloader scheduler yet
+ this.offloaderStats = LedgerOffloaderStats.create(false, false, null, 0);
}
public MetadataStore createConfigurationMetadataStore() throws MetadataStoreException {
@@ -741,8 +741,17 @@ public class PulsarService implements AutoCloseable, ShutdownService {
schemaRegistryService = SchemaRegistryService.create(
schemaStorage, config.getSchemaRegistryCompatibilityCheckers(), this.executor);
- this.defaultOffloader = createManagedLedgerOffloader(
- OffloadPoliciesImpl.create(this.getConfiguration().getProperties()));
+ OffloadPoliciesImpl defaultOffloadPolicies =
+ OffloadPoliciesImpl.create(this.getConfiguration().getProperties());
+ this.defaultOffloader = createManagedLedgerOffloader(defaultOffloadPolicies);
+
+ OrderedScheduler offloaderScheduler = getOffloaderScheduler(defaultOffloadPolicies);
+ int interval = config.getManagedLedgerStatsPeriodSeconds();
+ boolean exposeTopicMetrics = config.isExposeTopicLevelMetricsInPrometheus();
+
+ offloaderStats = LedgerOffloaderStats.create(config.isExposeManagedLedgerMetricsInPrometheus(),
+ exposeTopicMetrics, offloaderScheduler, interval);
+
this.brokerInterceptor = BrokerInterceptors.load(config);
brokerService.setInterceptor(getBrokerInterceptor());
this.brokerInterceptor.initialize(this);
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java
index 9a1e7643215..07af1e68af8 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java
@@ -92,11 +92,6 @@ public class BlobStoreBackedInputStreamImpl extends BackedInputStream {
try {
long startReadTime = System.nanoTime();
Blob blob = blobStore.getBlob(bucket, key, new GetOptions().range(startRange, endRange));
- if (this.offloaderStats != null) {
- this.offloaderStats.recordReadOffloadDataLatency(managedLedgerName,
- System.nanoTime() - startReadTime, TimeUnit.NANOSECONDS);
- this.offloaderStats.recordReadOffloadBytes(managedLedgerName, endRange - startRange + 1);
- }
versionCheck.check(key, blob);
try (InputStream stream = blob.getPayload().openStream()) {
@@ -110,6 +105,15 @@ public class BlobStoreBackedInputStreamImpl extends BackedInputStream {
}
cursor += buffer.readableBytes();
}
+
+ // here we can get the metrics
+ // because JClouds streams the content
+ // and actually the HTTP call finishes when the stream is fully read
+ if (this.offloaderStats != null) {
+ this.offloaderStats.recordReadOffloadDataLatency(managedLedgerName,
+ System.nanoTime() - startReadTime, TimeUnit.NANOSECONDS);
+ this.offloaderStats.recordReadOffloadBytes(managedLedgerName, endRange - startRange + 1);
+ }
} catch (Throwable e) {
if (null != this.offloaderStats) {
this.offloaderStats.recordReadOffloadError(this.managedLedgerName);