You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2022/07/14 07:07:09 UTC

[pulsar] branch master updated: Offloaders: fix metrics (#16405)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ac3f3eee601 Offloaders: fix metrics (#16405)
ac3f3eee601 is described below

commit ac3f3eee601fb872ddb44d52642e1d31f51e930d
Author: Enrico Olivelli <eo...@apache.org>
AuthorDate: Thu Jul 14 09:06:58 2022 +0200

    Offloaders: fix metrics (#16405)
---
 .../mledger/impl/LedgerOffloaderStatsImpl.java     | 24 +++++++++++++++++++---
 .../org/apache/pulsar/broker/PulsarService.java    | 19 ++++++++++++-----
 .../impl/BlobStoreBackedInputStreamImpl.java       | 14 ++++++++-----
 3 files changed, 44 insertions(+), 13 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerOffloaderStatsImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerOffloaderStatsImpl.java
index 64b0d125162..72382b1ac45 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerOffloaderStatsImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerOffloaderStatsImpl.java
@@ -54,6 +54,7 @@ public final class LedgerOffloaderStatsImpl implements LedgerOffloaderStats, Run
     private final Summary readLedgerLatency;
     private final Counter writeStorageError;
     private final Counter readOffloadError;
+    private final Counter readOffloadBytes;
     private final Gauge readOffloadRate;
     private final Summary readOffloadIndexLatency;
     private final Summary readOffloadDataLatency;
@@ -86,15 +87,30 @@ public final class LedgerOffloaderStatsImpl implements LedgerOffloaderStats, Run
                 .labelNames(labels).create().register();
         this.readOffloadRate = Gauge.build("brk_ledgeroffloader_read_offload_rate", "-")
                 .labelNames(labels).create().register();
+         this.readOffloadBytes = Counter.build("brk_ledgeroffloader_read_bytes", "-")
+                 .labelNames(labels).create().register();
         this.writeStorageError = Counter.build("brk_ledgeroffloader_write_storage_error", "-")
                 .labelNames(labels).create().register();
 
         this.readOffloadIndexLatency = Summary.build("brk_ledgeroffloader_read_offload_index_latency", "-")
-                .labelNames(labels).create().register();
+                .labelNames(labels).quantile(0.50, 0.01)
+                .quantile(0.95, 0.01)
+                .quantile(0.99, 0.01)
+                .quantile(1, 0.01)
+                .create().register();
         this.readOffloadDataLatency = Summary.build("brk_ledgeroffloader_read_offload_data_latency", "-")
-                .labelNames(labels).create().register();
+                .labelNames(labels)
+                .quantile(0.50, 0.01)
+                .quantile(0.95, 0.01)
+                .quantile(0.99, 0.01)
+                .quantile(1, 0.01)
+                .create().register();
         this.readLedgerLatency = Summary.build("brk_ledgeroffloader_read_ledger_latency", "-")
-                .labelNames(labels).create().register();
+                .labelNames(labels).quantile(0.50, 0.01)
+                .quantile(0.95, 0.01)
+                .quantile(0.99, 0.01)
+                .quantile(1, 0.01)
+                .create().register();
 
         String[] deleteOpsLabels = exposeTopicLevelMetrics
                 ? new String[]{NAMESPACE_LABEL, TOPIC_LABEL, STATUS} : new String[]{NAMESPACE_LABEL, STATUS};
@@ -156,6 +172,8 @@ public final class LedgerOffloaderStatsImpl implements LedgerOffloaderStats, Run
         Pair<LongAdder, LongAdder> pair = this.offloadAndReadOffloadBytesMap
                 .computeIfAbsent(topic, __ -> new ImmutablePair<>(new LongAdder(), new LongAdder()));
         pair.getRight().add(size);
+        String[] labelValues = this.labelValues(topic);
+        this.readOffloadBytes.labels(labelValues).inc(size);
         this.addOrUpdateTopicAccess(topic);
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index ea7b4481b23..a79499438f5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -209,7 +209,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
     private OrderedScheduler offloaderScheduler;
     private OffloadersCache offloadersCache = new OffloadersCache();
     private LedgerOffloader defaultOffloader;
-    private final LedgerOffloaderStats offloaderStats;
+    private LedgerOffloaderStats offloaderStats;
     private Map<NamespaceName, LedgerOffloader> ledgerOffloaderMap = new ConcurrentHashMap<>();
     private ScheduledFuture<?> loadReportTask = null;
     private ScheduledFuture<?> loadSheddingTask = null;
@@ -346,8 +346,8 @@ public class PulsarService implements AutoCloseable, ShutdownService {
 
         int interval = config.getManagedLedgerStatsPeriodSeconds();
         boolean exposeTopicMetrics = config.isExposeTopicLevelMetricsInPrometheus();
-        this.offloaderStats = LedgerOffloaderStats.create(config.isExposeManagedLedgerMetricsInPrometheus(),
-                exposeTopicMetrics, this.getOffloaderScheduler(), interval);
+        // here in the constructor we don't have the offloader scheduler yet
+        this.offloaderStats = LedgerOffloaderStats.create(false, false, null, 0);
     }
 
     public MetadataStore createConfigurationMetadataStore() throws MetadataStoreException {
@@ -741,8 +741,17 @@ public class PulsarService implements AutoCloseable, ShutdownService {
             schemaRegistryService = SchemaRegistryService.create(
                     schemaStorage, config.getSchemaRegistryCompatibilityCheckers(), this.executor);
 
-            this.defaultOffloader = createManagedLedgerOffloader(
-                    OffloadPoliciesImpl.create(this.getConfiguration().getProperties()));
+            OffloadPoliciesImpl defaultOffloadPolicies =
+                    OffloadPoliciesImpl.create(this.getConfiguration().getProperties());
+            this.defaultOffloader = createManagedLedgerOffloader(defaultOffloadPolicies);
+
+            OrderedScheduler offloaderScheduler = getOffloaderScheduler(defaultOffloadPolicies);
+            int interval = config.getManagedLedgerStatsPeriodSeconds();
+            boolean exposeTopicMetrics = config.isExposeTopicLevelMetricsInPrometheus();
+
+            offloaderStats = LedgerOffloaderStats.create(config.isExposeManagedLedgerMetricsInPrometheus(),
+                    exposeTopicMetrics, offloaderScheduler, interval);
+
             this.brokerInterceptor = BrokerInterceptors.load(config);
             brokerService.setInterceptor(getBrokerInterceptor());
             this.brokerInterceptor.initialize(this);
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java
index 9a1e7643215..07af1e68af8 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java
@@ -92,11 +92,6 @@ public class BlobStoreBackedInputStreamImpl extends BackedInputStream {
             try {
                 long startReadTime = System.nanoTime();
                 Blob blob = blobStore.getBlob(bucket, key, new GetOptions().range(startRange, endRange));
-                if (this.offloaderStats != null) {
-                    this.offloaderStats.recordReadOffloadDataLatency(managedLedgerName,
-                            System.nanoTime() - startReadTime, TimeUnit.NANOSECONDS);
-                    this.offloaderStats.recordReadOffloadBytes(managedLedgerName, endRange - startRange + 1);
-                }
                 versionCheck.check(key, blob);
 
                 try (InputStream stream = blob.getPayload().openStream()) {
@@ -110,6 +105,15 @@ public class BlobStoreBackedInputStreamImpl extends BackedInputStream {
                     }
                     cursor += buffer.readableBytes();
                 }
+
+                // here we can get the metrics
+                // because JClouds streams the content
+                // and actually the HTTP call finishes when the stream is fully read
+                if (this.offloaderStats != null) {
+                    this.offloaderStats.recordReadOffloadDataLatency(managedLedgerName,
+                            System.nanoTime() - startReadTime, TimeUnit.NANOSECONDS);
+                    this.offloaderStats.recordReadOffloadBytes(managedLedgerName, endRange - startRange + 1);
+                }
             } catch (Throwable e) {
                 if (null != this.offloaderStats) {
                     this.offloaderStats.recordReadOffloadError(this.managedLedgerName);