You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/04/15 07:00:45 UTC

[pulsar] branch master updated: Offloader metrics (#13833)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 732049fc6ca Offloader metrics (#13833)
732049fc6ca is described below

commit 732049fc6ca1beb046deb43057be2b130736fbca
Author: Tao Jiuming <95...@users.noreply.github.com>
AuthorDate: Fri Apr 15 15:00:38 2022 +0800

    Offloader metrics (#13833)
    
    ### Motivation
    Currently, there is no offload metrics for tiered storage, so it is very hard for us to debug the performance issues. For example , we can not find why offload is slow or why read offload is slow. For above reasons. we need to add some offload metrics for monitoring.
    
    ### Modifications
    Add metrics during offload procedure and read offload data procedure. Including offloadTime, offloadError, offloadRate, readLedgerLatency, writeStoreLatency, writeStoreError, readOffloadIndexLatency, readOffloadDataLatency, readOffloadRate, readOffloadError.
---
 .../bookkeeper/mledger/LedgerOffloaderFactory.java |   8 +-
 .../bookkeeper/mledger/LedgerOffloaderStats.java   |  62 ++++
 .../mledger/LedgerOffloaderStatsDisable.java       |  80 +++++
 .../mledger/impl/LedgerOffloaderStatsImpl.java     | 352 +++++++++++++++++++++
 .../org/apache/pulsar/broker/PulsarService.java    |  14 +-
 .../prometheus/PrometheusMetricsGenerator.java     |   7 +-
 .../broker/stats/LedgerOffloaderMetricsTest.java   | 187 +++++++++++
 .../pulsar/sql/presto/PulsarConnectorCache.java    |  17 +-
 .../pulsar/sql/presto/PulsarConnectorConfig.java   |  36 +++
 .../FileSystemLedgerOffloaderFactory.java          |   6 +-
 .../impl/FileStoreBackedReadHandleImpl.java        |  27 +-
 .../impl/FileSystemManagedLedgerOffloader.java     |  54 +++-
 .../offload/filesystem/FileStoreTestBase.java      |   6 +-
 .../impl/FileSystemManagedLedgerOffloaderTest.java |  36 ++-
 .../jcloud/JCloudLedgerOffloaderFactory.java       |   6 +-
 .../impl/BlobStoreBackedInputStreamImpl.java       |  23 ++
 .../jcloud/impl/BlobStoreBackedReadHandleImpl.java |  15 +-
 .../impl/BlobStoreBackedReadHandleImplV2.java      |  12 +-
 .../impl/BlobStoreManagedLedgerOffloader.java      |  38 ++-
 .../impl/BlockAwareSegmentInputStreamImpl.java     |  22 +-
 ...obStoreManagedLedgerOffloaderStreamingTest.java |   7 +-
 .../impl/BlobStoreManagedLedgerOffloaderTest.java  |  52 ++-
 22 files changed, 996 insertions(+), 71 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderFactory.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderFactory.java
index c854996cd16..bb94cee6fde 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderFactory.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderFactory.java
@@ -52,7 +52,8 @@ public interface LedgerOffloaderFactory<T extends LedgerOffloader> {
      */
     T create(OffloadPoliciesImpl offloadPolicies,
              Map<String, String> userMetadata,
-             OrderedScheduler scheduler)
+             OrderedScheduler scheduler,
+             LedgerOffloaderStats offloaderStats)
         throws IOException;
 
     /**
@@ -68,8 +69,9 @@ public interface LedgerOffloaderFactory<T extends LedgerOffloader> {
     default T create(OffloadPoliciesImpl offloadPolicies,
                      Map<String, String> userMetadata,
                      SchemaStorage schemaStorage,
-                     OrderedScheduler scheduler)
+                     OrderedScheduler scheduler,
+                     LedgerOffloaderStats offloaderStats)
             throws IOException {
-        return create(offloadPolicies, userMetadata, scheduler);
+        return create(offloadPolicies, userMetadata, scheduler, offloaderStats);
     }
 }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderStats.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderStats.java
new file mode 100644
index 00000000000..c9330ff9e05
--- /dev/null
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderStats.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.common.annotation.InterfaceAudience;
+import org.apache.bookkeeper.common.annotation.InterfaceStability;
+import org.apache.bookkeeper.mledger.impl.LedgerOffloaderStatsImpl;
+
+
+/**
+ * Management Bean for a {@link LedgerOffloader}.
+ */
+@InterfaceAudience.LimitedPrivate
+@InterfaceStability.Stable
+public interface LedgerOffloaderStats extends AutoCloseable {
+
+    void recordOffloadError(String topic);
+
+    void recordOffloadBytes(String topic, long size);
+
+    void recordReadLedgerLatency(String topic, long latency, TimeUnit unit);
+
+    void recordWriteToStorageError(String topic);
+
+    void recordReadOffloadError(String topic);
+
+    void recordReadOffloadBytes(String topic, long size);
+
+    void recordReadOffloadIndexLatency(String topic, long latency, TimeUnit unit);
+
+    void recordReadOffloadDataLatency(String topic, long latency, TimeUnit unit);
+
+    void recordDeleteOffloadOps(String topic, boolean succeed);
+
+
+    static LedgerOffloaderStats create(boolean exposeManagedLedgerStats, boolean exposeTopicLevelMetrics,
+                                       ScheduledExecutorService scheduler, int interval) {
+        if (!exposeManagedLedgerStats) {
+            return LedgerOffloaderStatsDisable.INSTANCE;
+        }
+
+        return LedgerOffloaderStatsImpl.getInstance(exposeTopicLevelMetrics, scheduler, interval);
+    }
+}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderStatsDisable.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderStatsDisable.java
new file mode 100644
index 00000000000..862d466067e
--- /dev/null
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderStatsDisable.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger;
+
+import java.util.concurrent.TimeUnit;
+
+class LedgerOffloaderStatsDisable implements LedgerOffloaderStats {
+
+    static final LedgerOffloaderStats INSTANCE = new LedgerOffloaderStatsDisable();
+
+    private LedgerOffloaderStatsDisable() {
+
+    }
+
+    @Override
+    public void recordOffloadError(String topic) {
+
+    }
+
+    @Override
+    public void recordOffloadBytes(String topic, long size) {
+
+    }
+
+    @Override
+    public void recordReadLedgerLatency(String topic, long latency, TimeUnit unit) {
+
+    }
+
+    @Override
+    public void recordWriteToStorageError(String topic) {
+
+    }
+
+    @Override
+    public void recordReadOffloadError(String topic) {
+
+    }
+
+    @Override
+    public void recordReadOffloadBytes(String topic, long size) {
+
+    }
+
+    @Override
+    public void recordReadOffloadIndexLatency(String topic, long latency, TimeUnit unit) {
+
+    }
+
+    @Override
+    public void recordReadOffloadDataLatency(String topic, long latency, TimeUnit unit) {
+
+    }
+
+    @Override
+    public void recordDeleteOffloadOps(String topic, boolean succeed) {
+
+    }
+
+    @Override
+    public void close() throws Exception {
+
+    }
+}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerOffloaderStatsImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerOffloaderStatsImpl.java
new file mode 100644
index 00000000000..1c21cd56445
--- /dev/null
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerOffloaderStatsImpl.java
@@ -0,0 +1,352 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.Counter;
+import io.prometheus.client.Gauge;
+import io.prometheus.client.Summary;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.stream.Collectors;
+import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.common.naming.TopicName;
+
+public final class LedgerOffloaderStatsImpl implements LedgerOffloaderStats, Runnable {
+    private static final String TOPIC_LABEL = "topic";
+    private static final String NAMESPACE_LABEL = "namespace";
+    private static final String UNKNOWN = "unknown";
+    private static final String STATUS = "status";
+    private static final String SUCCEED = "succeed";
+    private static final String FAILED = "failed";
+
+    private final boolean exposeTopicLevelMetrics;
+    private final int interval;
+
+    private final Counter offloadError;
+    private final Gauge offloadRate;
+    private final Counter deleteOffloadOps;
+    private final Summary readLedgerLatency;
+    private final Counter writeStorageError;
+    private final Counter readOffloadError;
+    private final Gauge readOffloadRate;
+    private final Summary readOffloadIndexLatency;
+    private final Summary readOffloadDataLatency;
+
+    private final Map<String, Long> topicAccess;
+    private final Map<String, String> topic2Namespace;
+    private final Map<String, Pair<LongAdder, LongAdder>> offloadAndReadOffloadBytesMap;
+
+    final AtomicBoolean closed = new AtomicBoolean(false);
+
+     private LedgerOffloaderStatsImpl(boolean exposeTopicLevelMetrics,
+                                     ScheduledExecutorService scheduler, int interval) {
+        this.interval = interval;
+        this.exposeTopicLevelMetrics = exposeTopicLevelMetrics;
+        if (null != scheduler) {
+            scheduler.scheduleAtFixedRate(this, interval, interval, TimeUnit.SECONDS);
+        }
+
+        this.topicAccess = new ConcurrentHashMap<>();
+        this.topic2Namespace = new ConcurrentHashMap<>();
+        this.offloadAndReadOffloadBytesMap = new ConcurrentHashMap<>();
+
+        String[] labels = exposeTopicLevelMetrics
+                ? new String[]{NAMESPACE_LABEL, TOPIC_LABEL} : new String[]{NAMESPACE_LABEL};
+
+        this.offloadError = Counter.build("brk_ledgeroffloader_offload_error", "-")
+                .labelNames(labels).create().register();
+        this.offloadRate = Gauge.build("brk_ledgeroffloader_offload_rate", "-")
+                .labelNames(labels).create().register();
+
+        this.readOffloadError = Counter.build("brk_ledgeroffloader_read_offload_error", "-")
+                .labelNames(labels).create().register();
+        this.readOffloadRate = Gauge.build("brk_ledgeroffloader_read_offload_rate", "-")
+                .labelNames(labels).create().register();
+        this.writeStorageError = Counter.build("brk_ledgeroffloader_write_storage_error", "-")
+                .labelNames(labels).create().register();
+
+        this.readOffloadIndexLatency = Summary.build("brk_ledgeroffloader_read_offload_index_latency", "-")
+                .labelNames(labels).create().register();
+        this.readOffloadDataLatency = Summary.build("brk_ledgeroffloader_read_offload_data_latency", "-")
+                .labelNames(labels).create().register();
+        this.readLedgerLatency = Summary.build("brk_ledgeroffloader_read_ledger_latency", "-")
+                .labelNames(labels).create().register();
+
+        String[] deleteOpsLabels = exposeTopicLevelMetrics
+                ? new String[]{NAMESPACE_LABEL, TOPIC_LABEL, STATUS} : new String[]{NAMESPACE_LABEL, STATUS};
+        this.deleteOffloadOps = Counter.build("brk_ledgeroffloader_delete_offload_ops", "-")
+                .labelNames(deleteOpsLabels).create().register();
+    }
+
+
+    private static LedgerOffloaderStats instance;
+    public static synchronized LedgerOffloaderStats getInstance(boolean exposeTopicLevelMetrics,
+                                                                ScheduledExecutorService scheduler, int interval) {
+        if (null == instance) {
+            instance = new LedgerOffloaderStatsImpl(exposeTopicLevelMetrics, scheduler, interval);
+        }
+
+        return instance;
+    }
+
+    @Override
+    public void recordOffloadError(String topic) {
+        String[] labelValues = this.labelValues(topic);
+        this.offloadError.labels(labelValues).inc();
+        this.addOrUpdateTopicAccess(topic);
+    }
+
+    @Override
+    public void recordOffloadBytes(String topic, long size) {
+        topic = StringUtils.isBlank(topic) ? UNKNOWN : topic;
+        Pair<LongAdder, LongAdder> pair = this.offloadAndReadOffloadBytesMap
+                .computeIfAbsent(topic, __ -> new ImmutablePair<>(new LongAdder(), new LongAdder()));
+        pair.getLeft().add(size);
+        this.addOrUpdateTopicAccess(topic);
+    }
+
+    @Override
+    public void recordReadLedgerLatency(String topic, long latency, TimeUnit unit) {
+        String[] labelValues = this.labelValues(topic);
+        this.readLedgerLatency.labels(labelValues).observe(unit.toMicros(latency));
+        this.addOrUpdateTopicAccess(topic);
+    }
+
+    @Override
+    public void recordWriteToStorageError(String topic) {
+        String[] labelValues = this.labelValues(topic);
+        this.writeStorageError.labels(labelValues).inc();
+        this.addOrUpdateTopicAccess(topic);
+    }
+
+    @Override
+    public void recordReadOffloadError(String topic) {
+        String[] labelValues = this.labelValues(topic);
+        this.readOffloadError.labels(labelValues).inc();
+        this.addOrUpdateTopicAccess(topic);
+    }
+
+    @Override
+    public void recordReadOffloadBytes(String topic, long size) {
+        topic = StringUtils.isBlank(topic) ? UNKNOWN : topic;
+        Pair<LongAdder, LongAdder> pair = this.offloadAndReadOffloadBytesMap
+                .computeIfAbsent(topic, __ -> new ImmutablePair<>(new LongAdder(), new LongAdder()));
+        pair.getRight().add(size);
+        this.addOrUpdateTopicAccess(topic);
+    }
+
+    @Override
+    public void recordReadOffloadIndexLatency(String topic, long latency, TimeUnit unit) {
+        String[] labelValues = this.labelValues(topic);
+        this.readOffloadIndexLatency.labels(labelValues).observe(unit.toMicros(latency));
+        this.addOrUpdateTopicAccess(topic);
+    }
+
+    @Override
+    public void recordReadOffloadDataLatency(String topic, long latency, TimeUnit unit) {
+        String[] labelValues = this.labelValues(topic);
+        this.readOffloadDataLatency.labels(labelValues).observe(unit.toMicros(latency));
+        this.addOrUpdateTopicAccess(topic);
+    }
+
+    @Override
+    public void recordDeleteOffloadOps(String topic, boolean succeed) {
+        String status = succeed ? SUCCEED : FAILED;
+        String[] labelValues = this.labelValues(topic, status);
+        this.deleteOffloadOps.labels(labelValues).inc();
+        this.addOrUpdateTopicAccess(topic);
+    }
+
+    private void addOrUpdateTopicAccess(String topic) {
+        topic = StringUtils.isBlank(topic) ? UNKNOWN : topic;
+        this.topicAccess.put(topic, System.currentTimeMillis());
+    }
+
+    private String[] labelValues(String topic, String status) {
+        if (StringUtils.isBlank(topic)) {
+            return exposeTopicLevelMetrics ? new String[]{UNKNOWN, UNKNOWN, status} : new String[]{UNKNOWN, status};
+        }
+        String namespace = this.getNamespace(topic);
+        return this.exposeTopicLevelMetrics ? new String[]{namespace, topic, status} : new String[]{namespace, status};
+    }
+
+    private String[] labelValues(String topic) {
+        if (StringUtils.isBlank(topic)) {
+            return this.exposeTopicLevelMetrics ? new String[]{UNKNOWN, UNKNOWN} : new String[]{UNKNOWN};
+        }
+        String namespace = this.getNamespace(topic);
+        return this.exposeTopicLevelMetrics ? new String[]{namespace, topic} : new String[]{namespace};
+    }
+
+    private String getNamespace(String topic) {
+        return this.topic2Namespace.computeIfAbsent(topic, t -> {
+            try {
+                return TopicName.get(t).getNamespace();
+            } catch (IllegalArgumentException ex) {
+                return UNKNOWN;
+            }
+        });
+    }
+
+    private void cleanExpiredTopicMetrics() {
+        long now = System.currentTimeMillis();
+        long timeout = TimeUnit.MINUTES.toMillis(2);
+
+        topicAccess.entrySet().removeIf(entry -> {
+            String topic = entry.getKey();
+            long access = entry.getValue();
+
+            if (now - access >= timeout) {
+                this.topic2Namespace.remove(topic);
+                this.offloadAndReadOffloadBytesMap.remove(topic);
+                String[] labelValues = this.labelValues(topic);
+                this.offloadError.remove(labelValues);
+                this.offloadRate.remove(labelValues);
+                this.readLedgerLatency.remove(labelValues);
+                this.writeStorageError.remove(labelValues);
+                this.readOffloadError.remove(labelValues);
+                this.readOffloadRate.remove(labelValues);
+                this.readOffloadIndexLatency.remove(labelValues);
+                this.readOffloadDataLatency.remove(labelValues);
+
+                labelValues = this.labelValues(topic, SUCCEED);
+                this.deleteOffloadOps.remove(labelValues);
+                labelValues = this.labelValues(topic, FAILED);
+                this.deleteOffloadOps.remove(labelValues);
+
+                return true;
+            }
+            return false;
+        });
+    }
+
+    @Override
+    public void run() {
+        this.cleanExpiredTopicMetrics();
+
+        this.offloadAndReadOffloadBytesMap.forEach((topic, pair) -> {
+            String[] labelValues = this.labelValues(topic);
+
+            double interval = this.interval;
+            long offloadBytes = pair.getLeft().sumThenReset();
+            long readOffloadBytes = pair.getRight().sumThenReset();
+
+            this.offloadRate.labels(labelValues).set(offloadBytes / interval);
+            this.readOffloadRate.labels(labelValues).set(readOffloadBytes / interval);
+        });
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (instance == this && this.closed.compareAndSet(false, true)) {
+            CollectorRegistry.defaultRegistry.unregister(this.offloadError);
+            CollectorRegistry.defaultRegistry.unregister(this.offloadRate);
+            CollectorRegistry.defaultRegistry.unregister(this.readLedgerLatency);
+            CollectorRegistry.defaultRegistry.unregister(this.writeStorageError);
+            CollectorRegistry.defaultRegistry.unregister(this.readOffloadError);
+            CollectorRegistry.defaultRegistry.unregister(this.readOffloadRate);
+            CollectorRegistry.defaultRegistry.unregister(this.readOffloadIndexLatency);
+            CollectorRegistry.defaultRegistry.unregister(this.readOffloadDataLatency);
+            this.topic2Namespace.clear();
+            this.offloadAndReadOffloadBytesMap.clear();
+        }
+    }
+
+    @VisibleForTesting
+    public long getOffloadBytes(String topic) {
+        if (this.exposeTopicLevelMetrics) {
+            Pair<LongAdder, LongAdder> pair = this.offloadAndReadOffloadBytesMap.get(topic);
+            return pair.getLeft().sum();
+        }
+
+        String namespace = this.topic2Namespace.get(topic);
+        List<String> topics = this.offloadAndReadOffloadBytesMap.keySet().stream()
+                .filter(topicName -> topicName.contains(namespace)).collect(Collectors.toList());
+
+        long totalBytes = 0;
+        for (String key : topics) {
+            totalBytes += this.offloadAndReadOffloadBytesMap.get(key).getLeft().sum();
+        }
+        return totalBytes;
+    }
+
+    @VisibleForTesting
+    public long getOffloadError(String topic) {
+        String[] labels = this.labelValues(topic);
+        return (long) this.offloadError.labels(labels).get();
+    }
+
+    @VisibleForTesting
+    public long getWriteStorageError(String topic) {
+        String[] labels = this.labelValues(topic);
+        return (long) this.writeStorageError.labels(labels).get();
+    }
+
+    @VisibleForTesting
+    public long getReadOffloadError(String topic) {
+        String[] labels = this.labelValues(topic);
+        return (long) this.readOffloadError.labels(labels).get();
+    }
+
+    @VisibleForTesting
+    public long getReadOffloadBytes(String topic) {
+        if (this.exposeTopicLevelMetrics) {
+            Pair<LongAdder, LongAdder> pair = this.offloadAndReadOffloadBytesMap.get(topic);
+            return pair.getRight().sum();
+        }
+
+        String namespace = this.topic2Namespace.get(topic);
+        List<String> topics = this.offloadAndReadOffloadBytesMap.keySet().stream()
+                .filter(topicName -> topicName.contains(namespace)).collect(Collectors.toList());
+
+        long totalBytes = 0;
+        for (String key : topics) {
+            totalBytes += this.offloadAndReadOffloadBytesMap.get(key).getRight().sum();
+        }
+        return totalBytes;
+    }
+
+    @VisibleForTesting
+    public Summary.Child.Value getReadLedgerLatency(String topic) {
+        String[] labels = this.labelValues(topic);
+        return this.readLedgerLatency.labels(labels).get();
+    }
+
+    @VisibleForTesting
+    public Summary.Child.Value getReadOffloadIndexLatency(String topic) {
+        String[] labels = this.labelValues(topic);
+        return this.readOffloadIndexLatency.labels(labels).get();
+    }
+
+    @VisibleForTesting
+    public Summary.Child.Value getReadOffloadDataLatency(String topic) {
+        String[] labels = this.labelValues(topic);
+        return this.readOffloadDataLatency.labels(labels).get();
+    }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 76b0a624037..35cd22d4bbe 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -73,6 +73,7 @@ import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.mledger.LedgerOffloader;
 import org.apache.bookkeeper.mledger.LedgerOffloaderFactory;
+import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
 import org.apache.bookkeeper.mledger.offload.Offloaders;
@@ -204,6 +205,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
     private OrderedScheduler offloaderScheduler;
     private OffloadersCache offloadersCache = new OffloadersCache();
     private LedgerOffloader defaultOffloader;
+    private final LedgerOffloaderStats offloaderStats;
     private Map<NamespaceName, LedgerOffloader> ledgerOffloaderMap = new ConcurrentHashMap<>();
     private ScheduledFuture<?> loadReportTask = null;
     private ScheduledFuture<?> loadSheddingTask = null;
@@ -336,6 +338,11 @@ public class PulsarService implements AutoCloseable, ShutdownService {
                 new ExecutorProvider(1, "broker-client-shared-external-executor");
         this.brokerClientSharedTimer =
                 new HashedWheelTimer(new DefaultThreadFactory("broker-client-shared-timer"), 1, TimeUnit.MILLISECONDS);
+
+        int interval = config.getManagedLedgerStatsPeriodSeconds();
+        boolean exposeTopicMetrics = config.isExposeTopicLevelMetricsInPrometheus();
+        this.offloaderStats = LedgerOffloaderStats.create(config.isExposeManagedLedgerMetricsInPrometheus(),
+                exposeTopicMetrics, this.getOffloaderScheduler(), interval);
     }
 
     public MetadataStore createConfigurationMetadataStore() throws MetadataStoreException {
@@ -530,6 +537,9 @@ public class PulsarService implements AutoCloseable, ShutdownService {
             if (transactionExecutorProvider != null) {
                 transactionExecutorProvider.shutdownNow();
             }
+            if (this.offloaderStats != null) {
+                this.offloaderStats.close();
+            }
 
             brokerClientSharedExternalExecutorProvider.shutdownNow();
             brokerClientSharedInternalExecutorProvider.shutdownNow();
@@ -1259,6 +1269,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
                 checkNotNull(offloadPolicies.getOffloadersDirectory(),
                     "Offloader driver is configured to be '%s' but no offloaders directory is configured.",
                         offloadPolicies.getManagedLedgerOffloadDriver());
+
                 Offloaders offloaders = offloadersCache.getOrLoadOffloaders(
                         offloadPolicies.getOffloadersDirectory(), config.getNarExtractionDirectory());
 
@@ -1272,8 +1283,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
                             LedgerOffloader.METADATA_SOFTWARE_GITSHA_KEY.toLowerCase(), PulsarVersion.getGitSha(),
                             LedgerOffloader.METADATA_PULSAR_CLUSTER_NAME.toLowerCase(), config.getClusterName()
                         ),
-                        schemaStorage,
-                        getOffloaderScheduler(offloadPolicies));
+                        schemaStorage, getOffloaderScheduler(offloadPolicies), this.offloaderStats);
                 } catch (IOException ioe) {
                     throw new PulsarServerException(ioe.getMessage(), ioe.getCause());
                 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
index e0235817f0c..bf2884e2b3a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
@@ -38,6 +38,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.stats.NullStatsProvider;
 import org.apache.bookkeeper.stats.StatsProvider;
 import org.apache.pulsar.PulsarVersion;
@@ -53,6 +54,7 @@ import org.apache.pulsar.common.util.SimpleTextOutputStream;
  * in a text format suitable to be consumed by Prometheus.
  * Format specification can be found at <a href="https://prometheus.io/docs/instrumenting/exposition_formats/">Exposition Formats</a>
  */
+@Slf4j
 public class PrometheusMetricsGenerator {
 
     static {
@@ -199,13 +201,16 @@ public class PrometheusMetricsGenerator {
                             .write("{cluster=\"").write(cluster).write('"');
                 }
 
+                //to avoid quantile label duplicated
+                boolean appendedQuantile = false;
                 for (Map.Entry<String, String> metric : metrics1.getDimensions().entrySet()) {
                     if (metric.getKey().isEmpty() || "cluster".equals(metric.getKey())) {
                         continue;
                     }
                     stream.write(", ").write(metric.getKey()).write("=\"").write(metric.getValue()).write('"');
-                    if (value != null && !value.isEmpty()) {
+                    if (value != null && !value.isEmpty() && !appendedQuantile) {
                         stream.write(", ").write("quantile=\"").write(value).write('"');
+                        appendedQuantile = true;
                     }
                 }
                 stream.write("} ").write(String.valueOf(entry.getValue()))
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/LedgerOffloaderMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/LedgerOffloaderMetricsTest.java
new file mode 100644
index 00000000000..c6c8f07b868
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/LedgerOffloaderMetricsTest.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats;
+
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.List;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.mledger.LedgerOffloader;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.impl.LedgerOffloaderStatsImpl;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.junit.Assert;
+import org.mockito.Mockito;
+import org.mockito.stubbing.Answer;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.Test;
+
+public class LedgerOffloaderMetricsTest  extends BrokerTestBase {
+
+    @Override
+    protected void setup() throws Exception {
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testTopicLevelMetrics() throws Exception {
+        conf.setExposeTopicLevelMetricsInPrometheus(true);
+        super.baseSetup();
+
+        String ns1 = "prop/ns-abc1";
+        admin.namespaces().createNamespace(ns1);
+        String []topics = new String[3];
+
+        LedgerOffloaderStatsImpl offloaderStats = (LedgerOffloaderStatsImpl) pulsar.getOffloaderStats();
+
+        LedgerOffloader offloader = Mockito.mock(LedgerOffloader.class);
+        Topic topic = Mockito.mock(PersistentTopic.class);
+        CompletableFuture<Optional<Topic>> topicFuture = new CompletableFuture<>();
+        Optional<Topic> topicOptional = Optional.of(topic);
+        topicFuture.complete(topicOptional);
+        BrokerService brokerService = spy(pulsar.getBrokerService());
+        doReturn(brokerService).when(pulsar).getBrokerService();
+
+
+        for (int i = 0; i < 3; i++) {
+            String topicName = "persistent://prop/ns-abc1/testMetrics" + UUID.randomUUID();
+            topics[i] = topicName;
+            admin.topics().createNonPartitionedTopic(topicName);
+
+            doReturn(topicFuture).when(brokerService).getTopicIfExists(topicName);
+            Assert.assertTrue(topic instanceof PersistentTopic);
+
+            ManagedLedger ledgerM = Mockito.mock(ManagedLedger.class);
+            doReturn(ledgerM).when(((PersistentTopic) topic)).getManagedLedger();
+            ManagedLedgerConfig config = Mockito.mock(ManagedLedgerConfig.class);
+            doReturn(config).when(ledgerM).getConfig();
+            doReturn(offloader).when(config).getLedgerOffloader();
+
+            offloaderStats.recordOffloadError(topicName);
+            offloaderStats.recordOffloadError(topicName);
+            offloaderStats.recordOffloadBytes(topicName, 100);
+            offloaderStats.recordReadLedgerLatency(topicName, 1000, TimeUnit.NANOSECONDS);
+            offloaderStats.recordReadOffloadError(topicName);
+            offloaderStats.recordReadOffloadError(topicName);
+            offloaderStats.recordReadOffloadIndexLatency(topicName, 1000000L, TimeUnit.NANOSECONDS);
+            offloaderStats.recordReadOffloadBytes(topicName, 100000);
+            offloaderStats.recordWriteToStorageError(topicName);
+            offloaderStats.recordWriteToStorageError(topicName);
+        }
+
+        for (String topicName : topics) {
+            Assert.assertEquals(offloaderStats.getOffloadError(topicName), 2);
+            Assert.assertEquals(offloaderStats.getOffloadBytes(topicName) , 100);
+            Assert.assertEquals((long) offloaderStats.getReadLedgerLatency(topicName).sum, 1);
+            Assert.assertEquals(offloaderStats.getReadOffloadError(topicName), 2);
+            Assert.assertEquals((long) offloaderStats.getReadOffloadIndexLatency(topicName).sum ,1000);
+            Assert.assertEquals(offloaderStats.getReadOffloadBytes(topicName), 100000);
+            Assert.assertEquals(offloaderStats.getWriteStorageError(topicName), 2);
+        }
+    }
+
+    @Test
+    public void testNamespaceLevelMetrics() throws Exception {
+        conf.setExposeTopicLevelMetricsInPrometheus(false);
+        super.baseSetup();
+
+        String ns1 = "prop/ns-abc1";
+        String ns2 = "prop/ns-abc2";
+
+        LedgerOffloaderStatsImpl offloaderStats = (LedgerOffloaderStatsImpl) pulsar.getOffloaderStats();
+
+        LedgerOffloader offloader = Mockito.mock(LedgerOffloader.class);
+        Topic topic = Mockito.mock(PersistentTopic.class);
+        CompletableFuture<Optional<Topic>> topicFuture = new CompletableFuture<>();
+        Optional<Topic> topicOptional = Optional.of(topic);
+        topicFuture.complete(topicOptional);
+        BrokerService brokerService = spy(pulsar.getBrokerService());
+        doReturn(brokerService).when(pulsar).getBrokerService();
+        Queue<String> queue = new LinkedList<>();
+        Map<String, List<String>> namespace2Topics = new HashMap<>();
+        for (int s = 0; s < 2; s++) {
+            String nameSpace = ns1;
+            if (s == 1) {
+                nameSpace = ns2;
+            }
+            namespace2Topics.put(nameSpace, new ArrayList<>());
+
+            admin.namespaces().createNamespace(nameSpace);
+            String baseTopic1 = "persistent://" + nameSpace + "/testMetrics";
+            for (int i = 0; i < 6; i++) {
+                String topicName = baseTopic1 + UUID.randomUUID();
+                List<String> topicList = namespace2Topics.get(nameSpace);
+                topicList.add(topicName);
+
+                queue.add(topicName);
+                admin.topics().createNonPartitionedTopic(topicName);
+                doReturn(topicFuture).when(brokerService).getTopicIfExists(topicName);
+                Assert.assertTrue(topic instanceof PersistentTopic);
+
+
+                ManagedLedger ledgerM = Mockito.mock(ManagedLedger.class);
+                doReturn(ledgerM).when(((PersistentTopic) topic)).getManagedLedger();
+                ManagedLedgerConfig config = Mockito.mock(ManagedLedgerConfig.class);
+                doReturn(config).when(ledgerM).getConfig();
+                doReturn(offloader).when(config).getLedgerOffloader();
+                Mockito.when(ledgerM.getName()).thenAnswer((Answer<String>) invocation -> queue.poll());
+
+                offloaderStats.recordOffloadError(topicName);
+                offloaderStats.recordOffloadBytes(topicName, 100);
+                offloaderStats.recordReadLedgerLatency(topicName, 1000, TimeUnit.NANOSECONDS);
+                offloaderStats.recordReadOffloadError(topicName);
+                offloaderStats.recordReadOffloadIndexLatency(topicName, 1000000L, TimeUnit.NANOSECONDS);
+                offloaderStats.recordReadOffloadBytes(topicName, 100000);
+                offloaderStats.recordWriteToStorageError(topicName);
+            }
+        }
+
+        for (Map.Entry<String, List<String>> entry : namespace2Topics.entrySet()) {
+            String namespace = entry.getKey();
+            List<String> topics = entry.getValue();
+            String topicName = topics.get(0);
+
+            Assert.assertTrue(offloaderStats.getOffloadError(topicName) >= 1);
+            Assert.assertTrue(offloaderStats.getOffloadBytes(topicName) >= 100);
+            Assert.assertTrue((long) offloaderStats.getReadLedgerLatency(topicName).sum >= 1);
+            Assert.assertTrue(offloaderStats.getReadOffloadError(topicName) >= 1);
+            Assert.assertTrue((long) offloaderStats.getReadOffloadIndexLatency(topicName).sum >= 1000);
+            Assert.assertTrue(offloaderStats.getReadOffloadBytes(topicName) >= 100000);
+            Assert.assertTrue(offloaderStats.getWriteStorageError(topicName) >= 1);
+        }
+    }
+
+}
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
index 9a64c055d07..f6a4771f9e4 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
@@ -30,6 +30,7 @@ import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.mledger.LedgerOffloader;
 import org.apache.bookkeeper.mledger.LedgerOffloaderFactory;
+import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
@@ -61,6 +62,7 @@ public class PulsarConnectorCache {
 
     private final StatsProvider statsProvider;
     private OrderedScheduler offloaderScheduler;
+    private final LedgerOffloaderStats offloaderStats;
     private OffloadersCache offloadersCache = new OffloadersCache();
     private LedgerOffloader defaultOffloader;
     private Map<NamespaceName, LedgerOffloader> offloaderMap = new ConcurrentHashMap<>();
@@ -84,6 +86,14 @@ public class PulsarConnectorCache {
 
         this.statsProvider.start(clientConfiguration);
 
+        this.initOffloaderScheduler(pulsarConnectorConfig.getOffloadPolices());
+
+        int period = pulsarConnectorConfig.getManagedLedgerStatsPeriodSeconds();
+        boolean exposeTopicLevelMetrics = pulsarConnectorConfig.isExposeTopicLevelMetricsInPrometheus();
+        this.offloaderStats =
+                LedgerOffloaderStats.create(pulsarConnectorConfig.isExposeManagedLedgerMetricsInPrometheus(),
+                        exposeTopicLevelMetrics, offloaderScheduler, period);
+
         this.defaultOffloader = initManagedLedgerOffloader(
                 pulsarConnectorConfig.getOffloadPolices(), pulsarConnectorConfig);
     }
@@ -142,13 +152,10 @@ public class PulsarConnectorCache {
         return managedLedgerConfig;
     }
 
-    private synchronized OrderedScheduler getOffloaderScheduler(OffloadPoliciesImpl offloadPolicies) {
-        if (this.offloaderScheduler == null) {
+    private void initOffloaderScheduler(OffloadPoliciesImpl offloadPolicies) {
             this.offloaderScheduler = OrderedScheduler.newSchedulerBuilder()
                     .numThreads(offloadPolicies.getManagedLedgerOffloadMaxThreads())
                     .name("pulsar-offloader").build();
-        }
-        return this.offloaderScheduler;
     }
 
     private LedgerOffloader initManagedLedgerOffloader(OffloadPoliciesImpl offloadPolicies,
@@ -171,7 +178,7 @@ public class PulsarConnectorCache {
                             LedgerOffloader.METADATA_SOFTWARE_VERSION_KEY.toLowerCase(), PulsarVersion.getVersion(),
                             LedgerOffloader.METADATA_SOFTWARE_GITSHA_KEY.toLowerCase(), PulsarVersion.getGitSha()
                         ),
-                        getOffloaderScheduler(offloadPolicies));
+                        this.offloaderScheduler, this.offloaderStats);
                 } catch (IOException ioe) {
                     log.error("Failed to create offloader: ", ioe);
                     throw new RuntimeException(ioe.getMessage(), ioe.getCause());
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
index abaf9a6fbce..0e5c2b4e95f 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
@@ -69,6 +69,11 @@ public class PulsarConnectorConfig implements AutoCloseable {
     private String offloadersDirectory = "./offloaders";
     private Map<String, String> offloaderProperties = new HashMap<>();
 
+    //--- Ledger metrics ---
+    private boolean exposeTopicLevelMetricsInPrometheus = false;
+    private boolean exposeManagedLedgerMetricsInPrometheus = false;
+    private int managedLedgerStatsPeriodSeconds = 60;
+
     private PulsarAdmin pulsarAdmin;
 
     // --- Bookkeeper
@@ -277,6 +282,37 @@ public class PulsarConnectorConfig implements AutoCloseable {
         return this;
     }
 
+    @Config("pulsar.expose-topic-level-metrics-in-prometheus")
+    public PulsarConnectorConfig setExposeTopicLevelMetricsInPrometheus(boolean exposeTopicLevelMetricsInPrometheus) {
+        this.exposeTopicLevelMetricsInPrometheus = exposeTopicLevelMetricsInPrometheus;
+        return this;
+    }
+
+    public boolean isExposeTopicLevelMetricsInPrometheus() {
+        return exposeTopicLevelMetricsInPrometheus;
+    }
+
+    @Config("pulsar.expose-managed-ledger-metrics-in-prometheus")
+    public PulsarConnectorConfig setExposeManagedLedgerMetricsInPrometheus(
+            boolean exposeManagedLedgerMetricsInPrometheus) {
+        this.exposeManagedLedgerMetricsInPrometheus = exposeManagedLedgerMetricsInPrometheus;
+        return this;
+    }
+
+    public boolean isExposeManagedLedgerMetricsInPrometheus() {
+        return exposeManagedLedgerMetricsInPrometheus;
+    }
+
+    @Config("pulsar.managed-ledger-stats-period-seconds")
+    public PulsarConnectorConfig setManagedLedgerStatsPeriodSeconds(int managedLedgerStatsPeriodSeconds) {
+        this.managedLedgerStatsPeriodSeconds = managedLedgerStatsPeriodSeconds;
+        return this;
+    }
+
+    public int getManagedLedgerStatsPeriodSeconds() {
+        return managedLedgerStatsPeriodSeconds;
+    }
+
     // --- Authentication ---
 
     public String getAuthPlugin() {
diff --git a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/FileSystemLedgerOffloaderFactory.java b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/FileSystemLedgerOffloaderFactory.java
index 4b32f77bc1f..c7876f9941a 100644
--- a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/FileSystemLedgerOffloaderFactory.java
+++ b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/FileSystemLedgerOffloaderFactory.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.Map;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.mledger.LedgerOffloaderFactory;
+import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
 import org.apache.bookkeeper.mledger.offload.filesystem.impl.FileSystemManagedLedgerOffloader;
 import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
 
@@ -34,7 +35,8 @@ public class FileSystemLedgerOffloaderFactory implements LedgerOffloaderFactory<
     @Override
     public FileSystemManagedLedgerOffloader create(OffloadPoliciesImpl offloadPolicies,
                                                    Map<String, String> userMetadata,
-                                                   OrderedScheduler scheduler) throws IOException {
-        return FileSystemManagedLedgerOffloader.create(offloadPolicies, scheduler);
+                                                   OrderedScheduler scheduler,
+                                                   LedgerOffloaderStats offloaderStats) throws IOException {
+        return FileSystemManagedLedgerOffloader.create(offloadPolicies, scheduler, offloaderStats);
     }
 }
diff --git a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java
index 5221b15d5d9..ecc0cfc23b3 100644
--- a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java
+++ b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
 import org.apache.bookkeeper.client.api.LedgerEntries;
@@ -35,6 +36,7 @@ import org.apache.bookkeeper.client.api.LedgerMetadata;
 import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.client.impl.LedgerEntriesImpl;
 import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
+import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.MapFile;
@@ -47,18 +49,25 @@ public class FileStoreBackedReadHandleImpl implements ReadHandle {
     private final MapFile.Reader reader;
     private final long ledgerId;
     private final LedgerMetadata ledgerMetadata;
+    private final LedgerOffloaderStats offloaderStats;
+    private final String managedLedgerName;
 
-    private FileStoreBackedReadHandleImpl(ExecutorService executor,
-                                          MapFile.Reader reader,
-                                          long ledgerId) throws IOException {
+    private FileStoreBackedReadHandleImpl(ExecutorService executor, MapFile.Reader reader, long ledgerId,
+                                          LedgerOffloaderStats offloaderStats,
+                                          String managedLedgerName) throws IOException {
         this.ledgerId = ledgerId;
         this.executor = executor;
         this.reader = reader;
+        this.offloaderStats = offloaderStats;
+        this.managedLedgerName = managedLedgerName;
         LongWritable key = new LongWritable();
         BytesWritable value = new BytesWritable();
         try {
             key.set(FileSystemManagedLedgerOffloader.METADATA_KEY_INDEX);
+            long startReadIndexTime = System.nanoTime();
             reader.get(key, value);
+            offloaderStats.recordReadOffloadIndexLatency(managedLedgerName,
+                    System.nanoTime() - startReadIndexTime, TimeUnit.NANOSECONDS);
             this.ledgerMetadata = parseLedgerMetadata(ledgerId, value.copyBytes());
         } catch (IOException e) {
             log.error("Fail to read LedgerMetadata for ledgerId {}",
@@ -113,7 +122,10 @@ public class FileStoreBackedReadHandleImpl implements ReadHandle {
                 key.set(nextExpectedId - 1);
                 reader.seek(key);
                 while (entriesToRead > 0) {
+                    long startReadTime = System.nanoTime();
                     reader.next(key, value);
+                    this.offloaderStats.recordReadOffloadDataLatency(managedLedgerName,
+                            System.nanoTime() - startReadTime, TimeUnit.NANOSECONDS);
                     int length = value.getLength();
                     long entryId = key.get();
                     if (entryId == nextExpectedId) {
@@ -122,6 +134,7 @@ public class FileStoreBackedReadHandleImpl implements ReadHandle {
                         buf.writeBytes(value.copyBytes());
                         entriesToRead--;
                         nextExpectedId++;
+                        this.offloaderStats.recordReadOffloadBytes(managedLedgerName, length);
                     } else if (entryId > lastEntry) {
                         log.info("Expected to read {}, but read {}, which is greater than last entry {}",
                                 nextExpectedId, entryId, lastEntry);
@@ -130,6 +143,7 @@ public class FileStoreBackedReadHandleImpl implements ReadHandle {
             }
                 promise.complete(LedgerEntriesImpl.create(entries));
             } catch (Throwable t) {
+                this.offloaderStats.recordReadOffloadError(managedLedgerName);
                 promise.completeExceptionally(t);
                 entries.forEach(LedgerEntry::close);
             }
@@ -176,9 +190,8 @@ public class FileStoreBackedReadHandleImpl implements ReadHandle {
         return promise;
     }
 
-    public static ReadHandle open(ScheduledExecutorService executor,
-                                  MapFile.Reader reader,
-                                  long ledgerId) throws IOException {
-            return new FileStoreBackedReadHandleImpl(executor, reader, ledgerId);
+    public static ReadHandle open(ScheduledExecutorService executor, MapFile.Reader reader, long ledgerId,
+                                  LedgerOffloaderStats offloaderStats, String managedLedgerName) throws IOException {
+        return new FileStoreBackedReadHandleImpl(executor, reader, ledgerId, offloaderStats, managedLedgerName);
     }
 }
diff --git a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java
index 02f1e62426a..d91d84b476b 100644
--- a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java
+++ b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java
@@ -29,12 +29,14 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.bookkeeper.client.api.LedgerEntries;
 import org.apache.bookkeeper.client.api.LedgerEntry;
 import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.mledger.LedgerOffloader;
+import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
 import org.apache.bookkeeper.mledger.offload.filesystem.FileSystemLedgerOffloaderFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -62,6 +64,7 @@ public class FileSystemManagedLedgerOffloader implements LedgerOffloader {
     private static final long ENTRIES_PER_READ = 100;
     private OrderedScheduler assignmentScheduler;
     private OffloadPoliciesImpl offloadPolicies;
+    private final LedgerOffloaderStats offloaderStats;
 
     public static boolean driverSupported(String driver) {
         return DRIVER_NAMES.equals(driver);
@@ -73,11 +76,13 @@ public class FileSystemManagedLedgerOffloader implements LedgerOffloader {
     }
 
     public static FileSystemManagedLedgerOffloader create(OffloadPoliciesImpl conf,
-                                                          OrderedScheduler scheduler) throws IOException {
-        return new FileSystemManagedLedgerOffloader(conf, scheduler);
+                                                          OrderedScheduler scheduler,
+                                                          LedgerOffloaderStats offloaderStats) throws IOException {
+        return new FileSystemManagedLedgerOffloader(conf, scheduler, offloaderStats);
     }
 
-    private FileSystemManagedLedgerOffloader(OffloadPoliciesImpl conf, OrderedScheduler scheduler) throws IOException {
+    private FileSystemManagedLedgerOffloader(OffloadPoliciesImpl conf, OrderedScheduler scheduler,
+                                             LedgerOffloaderStats offloaderStats) throws IOException {
         this.offloadPolicies = conf;
         this.configuration = new Configuration();
         if (conf.getFileSystemProfilePath() != null) {
@@ -105,13 +110,15 @@ public class FileSystemManagedLedgerOffloader implements LedgerOffloader {
         this.assignmentScheduler = OrderedScheduler.newSchedulerBuilder()
                 .numThreads(conf.getManagedLedgerOffloadMaxThreads())
                 .name("offload-assignment").build();
+        this.offloaderStats = offloaderStats;
     }
 
     @VisibleForTesting
     public FileSystemManagedLedgerOffloader(OffloadPoliciesImpl conf,
                                             OrderedScheduler scheduler,
                                             String testHDFSPath,
-                                            String baseDir) throws IOException {
+                                            String baseDir,
+                                            LedgerOffloaderStats offloaderStats) throws IOException {
         this.offloadPolicies = conf;
         this.configuration = new Configuration();
         this.configuration.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
@@ -125,6 +132,7 @@ public class FileSystemManagedLedgerOffloader implements LedgerOffloader {
         this.assignmentScheduler = OrderedScheduler.newSchedulerBuilder()
                 .numThreads(conf.getManagedLedgerOffloadMaxThreads())
                 .name("offload-assignment").build();
+        this.offloaderStats = offloaderStats;
     }
 
     @Override
@@ -142,8 +150,9 @@ public class FileSystemManagedLedgerOffloader implements LedgerOffloader {
     public CompletableFuture<Void> offload(ReadHandle readHandle, UUID uuid, Map<String, String> extraMetadata) {
         CompletableFuture<Void> promise = new CompletableFuture<>();
         scheduler.chooseThread(readHandle.getId()).submit(
-                new LedgerReader(readHandle, uuid, extraMetadata, promise, storageBasePath,
-                        configuration, assignmentScheduler, offloadPolicies.getManagedLedgerOffloadPrefetchRounds()));
+                new LedgerReader(readHandle, uuid, extraMetadata, promise, storageBasePath, configuration,
+                        assignmentScheduler, offloadPolicies.getManagedLedgerOffloadPrefetchRounds(),
+                        this.offloaderStats));
         return promise;
     }
 
@@ -158,6 +167,7 @@ public class FileSystemManagedLedgerOffloader implements LedgerOffloader {
         volatile Exception fileSystemWriteException = null;
         private OrderedScheduler assignmentScheduler;
         private int managedLedgerOffloadPrefetchRounds = 1;
+        private final LedgerOffloaderStats offloaderStats;
 
         private LedgerReader(ReadHandle readHandle,
                              UUID uuid,
@@ -166,7 +176,8 @@ public class FileSystemManagedLedgerOffloader implements LedgerOffloader {
                              String storageBasePath,
                              Configuration configuration,
                              OrderedScheduler assignmentScheduler,
-                             int managedLedgerOffloadPrefetchRounds) {
+                             int managedLedgerOffloadPrefetchRounds,
+                             LedgerOffloaderStats offloaderStats) {
             this.readHandle = readHandle;
             this.uuid = uuid;
             this.extraMetadata = extraMetadata;
@@ -175,6 +186,7 @@ public class FileSystemManagedLedgerOffloader implements LedgerOffloader {
             this.configuration = configuration;
             this.assignmentScheduler = assignmentScheduler;
             this.managedLedgerOffloadPrefetchRounds = managedLedgerOffloadPrefetchRounds;
+            this.offloaderStats = offloaderStats;
         }
 
         @Override
@@ -185,7 +197,8 @@ public class FileSystemManagedLedgerOffloader implements LedgerOffloader {
                 return;
             }
             long ledgerId = readHandle.getId();
-            String storagePath = getStoragePath(storageBasePath, extraMetadata.get(MANAGED_LEDGER_NAME));
+            final String topicName = extraMetadata.get(MANAGED_LEDGER_NAME);
+            String storagePath = getStoragePath(storageBasePath, topicName);
             String dataFilePath = getDataFilePath(storagePath, ledgerId, uuid);
             LongWritable key = new LongWritable();
             BytesWritable value = new BytesWritable();
@@ -208,7 +221,10 @@ public class FileSystemManagedLedgerOffloader implements LedgerOffloader {
                     long end = Math.min(needToOffloadFirstEntryNumber + ENTRIES_PER_READ - 1,
                             readHandle.getLastAddConfirmed());
                     log.debug("read ledger entries. start: {}, end: {}", needToOffloadFirstEntryNumber, end);
+                    long startReadTime = System.nanoTime();
                     LedgerEntries ledgerEntriesOnce = readHandle.readAsync(needToOffloadFirstEntryNumber, end).get();
+                    long cost = System.nanoTime() - startReadTime;
+                    this.offloaderStats.recordReadLedgerLatency(topicName, cost, TimeUnit.NANOSECONDS);
                     semaphore.acquire();
                     countDownLatch = new CountDownLatch(1);
                     assignmentScheduler.chooseThread(ledgerId)
@@ -225,10 +241,11 @@ public class FileSystemManagedLedgerOffloader implements LedgerOffloader {
                 promise.complete(null);
             } catch (Exception e) {
                 log.error("Exception when get CompletableFuture<LedgerEntries> : ManagerLedgerName: {}, "
-                        + "LedgerId: {}, UUID: {} ", extraMetadata.get(MANAGED_LEDGER_NAME), ledgerId, uuid, e);
+                        + "LedgerId: {}, UUID: {} ", topicName, ledgerId, uuid, e);
                 if (e instanceof InterruptedException) {
                     Thread.currentThread().interrupt();
                 }
+                this.offloaderStats.recordOffloadError(topicName);
                 promise.completeExceptionally(e);
             }
         }
@@ -288,6 +305,7 @@ public class FileSystemManagedLedgerOffloader implements LedgerOffloader {
 
         @Override
         public void run() {
+            String managedLedgerName = ledgerReader.extraMetadata.get(MANAGED_LEDGER_NAME);
             if (ledgerReader.fileSystemWriteException == null) {
                 Iterator<LedgerEntry> iterator = ledgerEntriesOnce.iterator();
                 while (iterator.hasNext()) {
@@ -299,9 +317,11 @@ public class FileSystemManagedLedgerOffloader implements LedgerOffloader {
                         dataWriter.append(key, value);
                     } catch (IOException e) {
                         ledgerReader.fileSystemWriteException = e;
+                        ledgerReader.offloaderStats.recordWriteToStorageError(managedLedgerName);
                         break;
                     }
                     haveOffloadEntryNumber.incrementAndGet();
+                    ledgerReader.offloaderStats.recordOffloadBytes(managedLedgerName, entry.getLength());
                 }
             }
             countDownLatch.countDown();
@@ -315,19 +335,19 @@ public class FileSystemManagedLedgerOffloader implements LedgerOffloader {
     public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uuid,
                                                        Map<String, String> offloadDriverMetadata) {
 
+        final String ledgerName = offloadDriverMetadata.get(MANAGED_LEDGER_NAME);
         CompletableFuture<ReadHandle> promise = new CompletableFuture<>();
-        String storagePath = getStoragePath(storageBasePath, offloadDriverMetadata.get(MANAGED_LEDGER_NAME));
+        String storagePath = getStoragePath(storageBasePath, ledgerName);
         String dataFilePath = getDataFilePath(storagePath, ledgerId, uuid);
         scheduler.chooseThread(ledgerId).submit(() -> {
             try {
                 MapFile.Reader reader = new MapFile.Reader(new Path(dataFilePath),
                         configuration);
-                promise.complete(FileStoreBackedReadHandleImpl.open(scheduler.chooseThread(ledgerId),
-                                                                    reader,
-                                                                    ledgerId));
+                promise.complete(FileStoreBackedReadHandleImpl.open(
+                        scheduler.chooseThread(ledgerId), reader, ledgerId, this.offloaderStats, ledgerName));
             } catch (Throwable t) {
                 log.error("Failed to open FileStoreBackedReadHandleImpl: ManagerLedgerName: {}, "
-                        + "LegerId: {}, UUID: {}", offloadDriverMetadata.get(MANAGED_LEDGER_NAME), ledgerId, uuid, t);
+                        + "LegerId: {}, UUID: {}", ledgerName, ledgerId, uuid, t);
                 promise.completeExceptionally(t);
             }
         });
@@ -344,7 +364,8 @@ public class FileSystemManagedLedgerOffloader implements LedgerOffloader {
 
     @Override
     public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid, Map<String, String> offloadDriverMetadata) {
-        String storagePath = getStoragePath(storageBasePath, offloadDriverMetadata.get(MANAGED_LEDGER_NAME));
+        String ledgerName = offloadDriverMetadata.get(MANAGED_LEDGER_NAME);
+        String storagePath = getStoragePath(storageBasePath, ledgerName);
         String dataFilePath = getDataFilePath(storagePath, ledgerId, uid);
         CompletableFuture<Void> promise = new CompletableFuture<>();
         try {
@@ -354,7 +375,8 @@ public class FileSystemManagedLedgerOffloader implements LedgerOffloader {
             log.error("Failed to delete Offloaded: ", e);
             promise.completeExceptionally(e);
         }
-        return promise;
+        return promise.whenComplete((__, t) ->
+                this.offloaderStats.recordDeleteOffloadOps(ledgerName, t == null));
     }
 
     @Override
diff --git a/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/FileStoreTestBase.java b/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/FileStoreTestBase.java
index 342bce601e8..b3a02a4f7f6 100644
--- a/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/FileStoreTestBase.java
+++ b/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/FileStoreTestBase.java
@@ -19,6 +19,7 @@
 package org.apache.bookkeeper.mledger.offload.filesystem;
 
 import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
 import org.apache.bookkeeper.mledger.offload.filesystem.impl.FileSystemManagedLedgerOffloader;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -30,6 +31,7 @@ import org.testng.annotations.BeforeMethod;
 import java.io.File;
 import java.nio.file.Files;
 import java.util.Properties;
+import java.util.concurrent.Executors;
 
 public abstract class FileStoreTestBase {
     protected FileSystemManagedLedgerOffloader fileSystemManagedLedgerOffloader;
@@ -37,6 +39,7 @@ public abstract class FileStoreTestBase {
     protected final String basePath = "pulsar";
     private MiniDFSCluster hdfsCluster;
     private String hdfsURI;
+    protected LedgerOffloaderStats offloaderStats;
 
     @BeforeMethod(alwaysRun = true)
     public void start() throws Exception {
@@ -48,9 +51,10 @@ public abstract class FileStoreTestBase {
 
         hdfsURI = "hdfs://localhost:"+ hdfsCluster.getNameNodePort() + "/";
         Properties properties = new Properties();
+        this.offloaderStats = LedgerOffloaderStats.create(true, true, Executors.newScheduledThreadPool(1), 60);
         fileSystemManagedLedgerOffloader = new FileSystemManagedLedgerOffloader(
                 OffloadPoliciesImpl.create(properties),
-                scheduler, hdfsURI, basePath);
+                scheduler, hdfsURI, basePath, offloaderStats);
     }
 
     @AfterMethod(alwaysRun = true)
diff --git a/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloaderTest.java b/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloaderTest.java
index 3600e9e5c57..3f9dbb35551 100644
--- a/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloaderTest.java
+++ b/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloaderTest.java
@@ -27,12 +27,13 @@ import org.apache.bookkeeper.client.api.LedgerEntries;
 import org.apache.bookkeeper.client.api.LedgerEntry;
 import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.mledger.LedgerOffloader;
+import org.apache.bookkeeper.mledger.impl.LedgerOffloaderStatsImpl;
 import org.apache.bookkeeper.mledger.offload.filesystem.FileStoreTestBase;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
-
 import java.net.URI;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -45,7 +46,7 @@ import static org.testng.Assert.assertTrue;
 
 public class FileSystemManagedLedgerOffloaderTest extends FileStoreTestBase {
     private final PulsarMockBookKeeper bk;
-    private String topic = "public/default/persistent/testOffload";
+    private String topic = "public/default/testOffload";
     private String storagePath = createStoragePath(topic);
     private LedgerHandle lh;
     private ReadHandle toWrite;
@@ -76,6 +77,12 @@ public class FileSystemManagedLedgerOffloaderTest extends FileStoreTestBase {
                 .withPassword("foobar".getBytes()).withDigestType(DigestType.CRC32).execute().get();
     }
 
+    @BeforeMethod(alwaysRun = true)
+    @Override
+    public void start() throws Exception {
+        super.start();
+    }
+
     @Test
     public void testOffloadAndRead() throws Exception {
         LedgerOffloader offloader = fileSystemManagedLedgerOffloader;
@@ -111,6 +118,31 @@ public class FileSystemManagedLedgerOffloaderTest extends FileStoreTestBase {
         }
     }
 
+    @Test
+    public void testOffloadAndReadMetrics() throws Exception {
+        LedgerOffloader offloader = fileSystemManagedLedgerOffloader;
+        UUID uuid = UUID.randomUUID();
+        offloader.offload(toWrite, uuid, map).get();
+
+        LedgerOffloaderStatsImpl offloaderStats = (LedgerOffloaderStatsImpl) this.offloaderStats;
+        assertTrue(offloaderStats.getOffloadError(topic) == 0);
+        assertTrue(offloaderStats.getOffloadBytes(topic) > 0);
+        assertTrue(offloaderStats.getReadLedgerLatency(topic).count > 0);
+        assertTrue(offloaderStats.getWriteStorageError(topic) == 0);
+
+        ReadHandle toTest = offloader.readOffloaded(toWrite.getId(), uuid, map).get();
+        LedgerEntries toTestEntries = toTest.read(0, numberOfEntries - 1);
+        Iterator<LedgerEntry> toTestIter = toTestEntries.iterator();
+        while (toTestIter.hasNext()) {
+            LedgerEntry toTestEntry = toTestIter.next();
+        }
+
+        assertTrue(offloaderStats.getReadOffloadError(topic) == 0);
+        assertTrue(offloaderStats.getReadOffloadBytes(topic) > 0);
+        assertTrue(offloaderStats.getReadOffloadDataLatency(topic).count > 0);
+        assertTrue(offloaderStats.getReadOffloadIndexLatency(topic).count > 0);
+    }
+
     @Test
     public void testDeleteOffload() throws Exception {
         LedgerOffloader offloader = fileSystemManagedLedgerOffloader;
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/JCloudLedgerOffloaderFactory.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/JCloudLedgerOffloaderFactory.java
index c74799ebac0..c7293696c2f 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/JCloudLedgerOffloaderFactory.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/JCloudLedgerOffloaderFactory.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.Map;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.mledger.LedgerOffloaderFactory;
+import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
 import org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreManagedLedgerOffloader;
 import org.apache.bookkeeper.mledger.offload.jcloud.provider.JCloudBlobStoreProvider;
 import org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration;
@@ -45,10 +46,11 @@ public class JCloudLedgerOffloaderFactory implements LedgerOffloaderFactory<Blob
 
     @Override
     public BlobStoreManagedLedgerOffloader create(OffloadPoliciesImpl offloadPolicies, Map<String, String> userMetadata,
-                                                  OrderedScheduler scheduler) throws IOException {
+                                                  OrderedScheduler scheduler,
+                                                  LedgerOffloaderStats offloaderStats) throws IOException {
 
         TieredStorageConfiguration config =
                 TieredStorageConfiguration.create(offloadPolicies.toProperties());
-        return BlobStoreManagedLedgerOffloader.create(config, userMetadata, scheduler);
+        return BlobStoreManagedLedgerOffloader.create(config, userMetadata, scheduler, offloaderStats);
     }
 }
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java
index e905884ddd0..61d8d0606f3 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java
@@ -21,6 +21,8 @@ package org.apache.bookkeeper.mledger.offload.jcloud.impl;
 import io.netty.buffer.ByteBuf;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
 import org.apache.bookkeeper.mledger.offload.jcloud.BackedInputStream;
 import org.apache.bookkeeper.mledger.offload.jcloud.impl.DataBlockUtils.VersionCheck;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
@@ -40,6 +42,8 @@ public class BlobStoreBackedInputStreamImpl extends BackedInputStream {
     private final ByteBuf buffer;
     private final long objectLen;
     private final int bufferSize;
+    private LedgerOffloaderStats offloaderStats;
+    private String managedLedgerName;
 
     private long cursor;
     private long bufferOffsetStart;
@@ -59,6 +63,16 @@ public class BlobStoreBackedInputStreamImpl extends BackedInputStream {
         this.bufferOffsetStart = this.bufferOffsetEnd = -1;
     }
 
+
+    public BlobStoreBackedInputStreamImpl(BlobStore blobStore, String bucket, String key,
+                                          VersionCheck versionCheck,
+                                          long objectLen, int bufferSize,
+                                          LedgerOffloaderStats offloaderStats, String managedLedgerName) {
+        this(blobStore, bucket, key, versionCheck, objectLen, bufferSize);
+        this.offloaderStats = offloaderStats;
+        this.managedLedgerName = managedLedgerName;
+    }
+
     /**
      * Refill the buffered input if it is empty.
      * @return true if there are bytes to read, false otherwise
@@ -73,7 +87,13 @@ public class BlobStoreBackedInputStreamImpl extends BackedInputStream {
                                      objectLen - 1);
 
             try {
+                long startReadTime = System.nanoTime();
                 Blob blob = blobStore.getBlob(bucket, key, new GetOptions().range(startRange, endRange));
+                if (this.offloaderStats != null) {
+                    this.offloaderStats.recordReadOffloadDataLatency(managedLedgerName,
+                            System.nanoTime() - startReadTime, TimeUnit.NANOSECONDS);
+                    this.offloaderStats.recordReadOffloadBytes(managedLedgerName, endRange - startRange + 1);
+                }
                 versionCheck.check(key, blob);
 
                 try (InputStream stream = blob.getPayload().openStream()) {
@@ -88,6 +108,9 @@ public class BlobStoreBackedInputStreamImpl extends BackedInputStream {
                     cursor += buffer.readableBytes();
                 }
             } catch (Throwable e) {
+                if (null != this.offloaderStats) {
+                    this.offloaderStats.recordReadOffloadError(this.managedLedgerName);
+                }
                 throw new IOException("Error reading from BlobStore", e);
             }
         }
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
index e129a11fe43..998912a30c4 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
 import org.apache.bookkeeper.client.api.LedgerEntries;
@@ -35,6 +36,7 @@ import org.apache.bookkeeper.client.api.LedgerMetadata;
 import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.client.impl.LedgerEntriesImpl;
 import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
+import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
 import org.apache.bookkeeper.mledger.offload.jcloud.BackedInputStream;
 import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlock;
 import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockBuilder;
@@ -62,8 +64,7 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle {
     private State state = null;
 
     private BlobStoreBackedReadHandleImpl(long ledgerId, OffloadIndexBlock index,
-                                          BackedInputStream inputStream,
-                                          ExecutorService executor) {
+                                          BackedInputStream inputStream, ExecutorService executor) {
         this.ledgerId = ledgerId;
         this.index = index;
         this.inputStream = inputStream;
@@ -222,7 +223,8 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle {
     public static ReadHandle open(ScheduledExecutorService executor,
                                   BlobStore blobStore, String bucket, String key, String indexKey,
                                   VersionCheck versionCheck,
-                                  long ledgerId, int readBufferSize)
+                                  long ledgerId, int readBufferSize,
+                                  LedgerOffloaderStats offloaderStats, String managedLedgerName)
             throws IOException {
         int retryCount = 3;
         OffloadIndexBlock index = null;
@@ -233,7 +235,10 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle {
         // If we use a backoff to control the retry, it will introduce a concurrent operation.
         // We don't want to make it complicated, because in the most of case it shouldn't in the retry loop.
         while (retryCount-- > 0) {
+            long readIndexStartTime = System.nanoTime();
             Blob blob = blobStore.getBlob(bucket, indexKey);
+            offloaderStats.recordReadOffloadIndexLatency(managedLedgerName,
+                    System.nanoTime() - readIndexStartTime, TimeUnit.NANOSECONDS);
             versionCheck.check(indexKey, blob);
             OffloadIndexBlockBuilder indexBuilder = OffloadIndexBlockBuilder.create();
             try (InputStream payLoadStream = blob.getPayload().openStream()) {
@@ -253,9 +258,7 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle {
         }
 
         BackedInputStream inputStream = new BlobStoreBackedInputStreamImpl(blobStore, bucket, key,
-                versionCheck,
-                index.getDataObjectLength(),
-                readBufferSize);
+                versionCheck, index.getDataObjectLength(), readBufferSize, offloaderStats, managedLedgerName);
 
         return new BlobStoreBackedReadHandleImpl(ledgerId, index, inputStream, executor);
     }
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java
index a6f66132c4a..e1a8296a20e 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java
@@ -29,6 +29,7 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import lombok.val;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
@@ -38,6 +39,7 @@ import org.apache.bookkeeper.client.api.LedgerMetadata;
 import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.client.impl.LedgerEntriesImpl;
 import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
+import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
 import org.apache.bookkeeper.mledger.offload.jcloud.BackedInputStream;
 import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockV2;
 import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockV2Builder;
@@ -274,7 +276,8 @@ public class BlobStoreBackedReadHandleImplV2 implements ReadHandle {
     public static ReadHandle open(ScheduledExecutorService executor,
                                   BlobStore blobStore, String bucket, List<String> keys, List<String> indexKeys,
                                   VersionCheck versionCheck,
-                                  long ledgerId, int readBufferSize)
+                                  long ledgerId, int readBufferSize, LedgerOffloaderStats offloaderStats,
+                                  String managedLedgerName)
             throws IOException {
         List<BackedInputStream> inputStreams = new LinkedList<>();
         List<OffloadIndexBlockV2> indice = new LinkedList<>();
@@ -282,7 +285,10 @@ public class BlobStoreBackedReadHandleImplV2 implements ReadHandle {
             String indexKey = indexKeys.get(i);
             String key = keys.get(i);
             log.debug("open bucket: {} index key: {}", bucket, indexKey);
+            long startTime = System.nanoTime();
             Blob blob = blobStore.getBlob(bucket, indexKey);
+            offloaderStats.recordReadOffloadIndexLatency(managedLedgerName,
+                    System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
             log.debug("indexKey blob: {} {}", indexKey, blob);
             versionCheck.check(indexKey, blob);
             OffloadIndexBlockV2Builder indexBuilder = OffloadIndexBlockV2Builder.create();
@@ -292,9 +298,7 @@ public class BlobStoreBackedReadHandleImplV2 implements ReadHandle {
             }
 
             BackedInputStream inputStream = new BlobStoreBackedInputStreamImpl(blobStore, bucket, key,
-                    versionCheck,
-                    index.getDataObjectLength(),
-                    readBufferSize);
+                    versionCheck, index.getDataObjectLength(), readBufferSize, offloaderStats, managedLedgerName);
             inputStreams.add(inputStream);
             indice.add(index);
         }
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
index 9aa7ecde660..1fdc0983b0e 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
@@ -45,6 +45,7 @@ import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.LedgerOffloader;
 import org.apache.bookkeeper.mledger.LedgerOffloader.OffloadHandle.OfferEntryResult;
+import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.OffloadedLedgerMetadata;
@@ -91,6 +92,8 @@ import org.jclouds.io.payloads.InputStreamPayload;
 @Slf4j
 public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
 
+    private static final String MANAGED_LEDGER_NAME = "ManagedLedgerName";
+
     private final OrderedScheduler scheduler;
     private final TieredStorageConfiguration config;
     private final Location writeLocation;
@@ -113,16 +116,18 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
     private final int streamingBlockSize;
     private volatile ManagedLedger ml;
     private OffloadIndexBlockV2Builder streamingIndexBuilder;
+    private final LedgerOffloaderStats offloaderStats;
 
     public static BlobStoreManagedLedgerOffloader create(TieredStorageConfiguration config,
                                                          Map<String, String> userMetadata,
-                                                         OrderedScheduler scheduler) throws IOException {
+                                                         OrderedScheduler scheduler,
+                                                         LedgerOffloaderStats offloaderStats) throws IOException {
 
-        return new BlobStoreManagedLedgerOffloader(config, scheduler, userMetadata);
+        return new BlobStoreManagedLedgerOffloader(config, scheduler, userMetadata, offloaderStats);
     }
 
     BlobStoreManagedLedgerOffloader(TieredStorageConfiguration config, OrderedScheduler scheduler,
-                                    Map<String, String> userMetadata) {
+                                    Map<String, String> userMetadata, LedgerOffloaderStats offloaderStats) {
 
         this.scheduler = scheduler;
         this.userMetadata = userMetadata;
@@ -149,6 +154,7 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
                 config.getBucket(), config.getRegion());
 
         blobStores.putIfAbsent(config.getBlobStoreLocation(), config.getBlobStore());
+        this.offloaderStats = offloaderStats;
         log.info("The ledger offloader was created.");
     }
 
@@ -170,6 +176,7 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
     public CompletableFuture<Void> offload(ReadHandle readHandle,
                                            UUID uuid,
                                            Map<String, String> extraMetadata) {
+        final String topicName = extraMetadata.get(MANAGED_LEDGER_NAME);
         final BlobStore writeBlobStore = blobStores.get(config.getBlobStoreLocation());
         log.info("offload {} uuid {} extraMetadata {} to {} {}", readHandle.getId(), uuid, extraMetadata,
                 config.getBlobStoreLocation(), writeBlobStore);
@@ -212,13 +219,14 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
             try {
                 long startEntry = 0;
                 int partId = 1;
+                long start = System.nanoTime();
                 long entryBytesWritten = 0;
                 while (startEntry <= readHandle.getLastAddConfirmed()) {
                     int blockSize = BlockAwareSegmentInputStreamImpl
                         .calculateBlockSize(config.getMaxBlockSizeInBytes(), readHandle, startEntry, entryBytesWritten);
 
                     try (BlockAwareSegmentInputStream blockStream = new BlockAwareSegmentInputStreamImpl(
-                        readHandle, startEntry, blockSize)) {
+                            readHandle, startEntry, blockSize, this.offloaderStats, topicName)) {
 
                         Payload partPayload = Payloads.newInputStreamPayload(blockStream);
                         partPayload.getContentMetadata().setContentLength((long) blockSize);
@@ -237,6 +245,7 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
                         }
                         entryBytesWritten += blockStream.getBlockEntryBytesCount();
                         partId++;
+                        this.offloaderStats.recordOffloadBytes(topicName, blockStream.getBlockEntryBytesCount());
                     }
 
                     dataObjectLength += blockSize;
@@ -254,6 +263,8 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
                     log.error("Failed abortMultipartUpload in bucket - {} with key - {}, uploadId - {}.",
                             config.getBucket(), dataBlockKey, mpu.id(), throwable);
                 }
+                this.offloaderStats.recordWriteToStorageError(topicName);
+                this.offloaderStats.recordOffloadError(topicName);
                 promise.completeExceptionally(t);
                 return;
             }
@@ -277,7 +288,6 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
                         .payload(indexPayload)
                         .contentLength((long) indexStream.getStreamSize())
                     .build();
-
                 writeBlobStore.putBlob(config.getBucket(), blob);
                 promise.complete(null);
             } catch (Throwable t) {
@@ -287,6 +297,9 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
                     log.error("Failed deleteObject in bucket - {} with key - {}.",
                             config.getBucket(), dataBlockKey, throwable);
                 }
+
+                this.offloaderStats.recordWriteToStorageError(topicName);
+                this.offloaderStats.recordOffloadError(topicName);
                 promise.completeExceptionally(t);
                 return;
             }
@@ -532,7 +545,8 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
                         readBlobstore,
                         readBucket, key, indexKey,
                         DataBlockUtils.VERSION_CHECK,
-                        ledgerId, config.getReadBufferSizeInBytes()));
+                        ledgerId, config.getReadBufferSizeInBytes(),
+                        this.offloaderStats, offloadDriverMetadata.get(MANAGED_LEDGER_NAME)));
             } catch (Throwable t) {
                 log.error("Failed readOffloaded: ", t);
                 promise.completeExceptionally(t);
@@ -565,7 +579,8 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
                         readBlobstore,
                         readBucket, keys, indexKeys,
                         DataBlockUtils.VERSION_CHECK,
-                        ledgerId, config.getReadBufferSizeInBytes()));
+                        ledgerId, config.getReadBufferSizeInBytes(),
+                        this.offloaderStats, offloadDriverMetadata.get(MANAGED_LEDGER_NAME)));
             } catch (Throwable t) {
                 log.error("Failed readOffloaded: ", t);
                 promise.completeExceptionally(t);
@@ -594,7 +609,11 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
             }
         });
 
-        return promise;
+        return promise.whenComplete((__, t) -> {
+            if (null != this.ml) {
+                this.offloaderStats.recordDeleteOffloadOps(this.ml.getName(), t == null);
+            }
+        });
     }
 
     @Override
@@ -616,7 +635,8 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
             }
         });
 
-        return promise;
+        return promise.whenComplete((__, t) ->
+                this.offloaderStats.recordDeleteOffloadOps(this.ml.getName(), t == null));
     }
 
     @Override
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java
index a4ffdea6509..da1f92438f0 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java
@@ -27,9 +27,11 @@ import java.io.InputStream;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.client.api.LedgerEntries;
 import org.apache.bookkeeper.client.api.LedgerEntry;
 import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
 import org.apache.bookkeeper.mledger.offload.jcloud.BlockAwareSegmentInputStream;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.slf4j.Logger;
@@ -65,6 +67,8 @@ public class BlockAwareSegmentInputStreamImpl extends BlockAwareSegmentInputStre
     static final int ENTRY_HEADER_SIZE = 4 /* entry size */ + 8 /* entry id */;
     // Keep a list of all entries ByteBuf, each ByteBuf contains 2 buf: entry header and entry content.
     private List<ByteBuf> entriesByteBuf = null;
+    private LedgerOffloaderStats offloaderStats;
+    private String topicName;
 
     public BlockAwareSegmentInputStreamImpl(ReadHandle ledger, long startEntryId, int blockSize) {
         this.ledger = ledger;
@@ -76,6 +80,13 @@ public class BlockAwareSegmentInputStreamImpl extends BlockAwareSegmentInputStre
         this.entriesByteBuf = Lists.newLinkedList();
     }
 
+    public BlockAwareSegmentInputStreamImpl(ReadHandle ledger, long startEntryId, int blockSize,
+                                            LedgerOffloaderStats offloaderStats, String ledgerName) {
+        this(ledger, startEntryId, blockSize);
+        this.offloaderStats = offloaderStats;
+        this.topicName = ledgerName;
+    }
+
     // read ledger entries.
     private int readEntries() throws IOException {
         checkState(bytesReadOffset >= DataBlockHeaderImpl.getDataStartOffset());
@@ -113,11 +124,18 @@ public class BlockAwareSegmentInputStreamImpl extends BlockAwareSegmentInputStre
 
     private List<ByteBuf> readNextEntriesFromLedger(long start, long maxNumberEntries) throws IOException {
         long end = Math.min(start + maxNumberEntries - 1, ledger.getLastAddConfirmed());
+        long startTime = System.nanoTime();
         try (LedgerEntries ledgerEntriesOnce = ledger.readAsync(start, end).get()) {
-            log.debug("read ledger entries. start: {}, end: {}", start, end);
+            if (log.isDebugEnabled()) {
+                log.debug("read ledger entries. start: {}, end: {} cost {}", start, end,
+                        TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - startTime));
+            }
+            if (offloaderStats != null && topicName != null) {
+                offloaderStats.recordReadLedgerLatency(topicName, System.nanoTime() - startTime,
+                        TimeUnit.NANOSECONDS);
+            }
 
             List<ByteBuf> entries = Lists.newLinkedList();
-
             Iterator<LedgerEntry> iterator = ledgerEntriesOnce.iterator();
             while (iterator.hasNext()) {
                 LedgerEntry entry = iterator.next();
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderStreamingTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderStreamingTest.java
index 32ad4980f46..952e758f0e0 100644
--- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderStreamingTest.java
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderStreamingTest.java
@@ -34,6 +34,7 @@ import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.LedgerOffloader;
 import org.apache.bookkeeper.mledger.LedgerOffloader.OffloadHandle;
+import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.impl.EntryImpl;
 import org.apache.bookkeeper.mledger.offload.jcloud.provider.JCloudBlobStoreProvider;
@@ -52,6 +53,7 @@ public class BlobStoreManagedLedgerOffloaderStreamingTest extends BlobStoreManag
     private static final Logger log = LoggerFactory.getLogger(BlobStoreManagedLedgerOffloaderStreamingTest.class);
     private TieredStorageConfiguration mockedConfig;
     private static final Random random = new Random();
+    private final LedgerOffloaderStats offloaderStats;
 
     BlobStoreManagedLedgerOffloaderStreamingTest() throws Exception {
         super();
@@ -60,6 +62,7 @@ public class BlobStoreManagedLedgerOffloaderStreamingTest extends BlobStoreManag
         assertNotNull(provider);
         provider.validate(config);
         blobStore = provider.getBlobStore(config);
+        this.offloaderStats = LedgerOffloaderStats.create(false, false, null, 0);
     }
 
     private BlobStoreManagedLedgerOffloader getOffloader(Map<String, String> additionalConfig) throws IOException {
@@ -76,7 +79,7 @@ public class BlobStoreManagedLedgerOffloaderStreamingTest extends BlobStoreManag
         mockedConfig = mock(TieredStorageConfiguration.class, delegatesTo(getConfiguration(bucket, additionalConfig)));
         Mockito.doReturn(blobStore).when(mockedConfig).getBlobStore(); // Use the REAL blobStore
         BlobStoreManagedLedgerOffloader offloader = BlobStoreManagedLedgerOffloader
-                .create(mockedConfig, new HashMap<String, String>(), scheduler);
+                .create(mockedConfig, new HashMap<String, String>(), scheduler, this.offloaderStats);
         return offloader;
     }
 
@@ -85,7 +88,7 @@ public class BlobStoreManagedLedgerOffloaderStreamingTest extends BlobStoreManag
         mockedConfig = mock(TieredStorageConfiguration.class, delegatesTo(getConfiguration(bucket, additionalConfig)));
         Mockito.doReturn(mockedBlobStore).when(mockedConfig).getBlobStore();
         BlobStoreManagedLedgerOffloader offloader = BlobStoreManagedLedgerOffloader
-                .create(mockedConfig, new HashMap<String, String>(), scheduler);
+                .create(mockedConfig, new HashMap<String, String>(), scheduler, this.offloaderStats);
         return offloader;
     }
 
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
index c607e79ddc2..74d03b11dba 100644
--- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
@@ -23,10 +23,9 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.mock;
-import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.fail;
-
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -38,12 +37,14 @@ import java.util.Random;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-
+import java.util.concurrent.Executors;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.api.LedgerEntries;
 import org.apache.bookkeeper.client.api.LedgerEntry;
 import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.mledger.LedgerOffloader;
+import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
+import org.apache.bookkeeper.mledger.impl.LedgerOffloaderStatsImpl;
 import org.apache.bookkeeper.mledger.OffloadedLedgerMetadata;
 import org.apache.bookkeeper.mledger.offload.jcloud.provider.JCloudBlobStoreProvider;
 import org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration;
@@ -60,6 +61,7 @@ public class BlobStoreManagedLedgerOffloaderTest extends BlobStoreManagedLedgerO
 
     private static final Logger log = LoggerFactory.getLogger(BlobStoreManagedLedgerOffloaderTest.class);
     private TieredStorageConfiguration mockedConfig;
+    private final LedgerOffloaderStats offloaderStats;
 
     BlobStoreManagedLedgerOffloaderTest() throws Exception {
         super();
@@ -68,6 +70,7 @@ public class BlobStoreManagedLedgerOffloaderTest extends BlobStoreManagedLedgerO
         assertNotNull(provider);
         provider.validate(config);
         blobStore = provider.getBlobStore(config);
+        this.offloaderStats = LedgerOffloaderStats.create(true, true, Executors.newScheduledThreadPool(1), 60);
     }
 
     private BlobStoreManagedLedgerOffloader getOffloader() throws IOException {
@@ -81,14 +84,14 @@ public class BlobStoreManagedLedgerOffloaderTest extends BlobStoreManagedLedgerO
     private BlobStoreManagedLedgerOffloader getOffloader(String bucket) throws IOException {
         mockedConfig = mock(TieredStorageConfiguration.class, delegatesTo(getConfiguration(bucket)));
         Mockito.doReturn(blobStore).when(mockedConfig).getBlobStore(); // Use the REAL blobStore
-        BlobStoreManagedLedgerOffloader offloader = BlobStoreManagedLedgerOffloader.create(mockedConfig, new HashMap<String,String>(), scheduler);
+        BlobStoreManagedLedgerOffloader offloader = BlobStoreManagedLedgerOffloader.create(mockedConfig, new HashMap<String,String>(), scheduler, this.offloaderStats);
         return offloader;
     }
     
     private BlobStoreManagedLedgerOffloader getOffloader(String bucket, BlobStore mockedBlobStore) throws IOException {
         mockedConfig = mock(TieredStorageConfiguration.class, delegatesTo(getConfiguration(bucket)));
         Mockito.doReturn(mockedBlobStore).when(mockedConfig).getBlobStore(); 
-        BlobStoreManagedLedgerOffloader offloader = BlobStoreManagedLedgerOffloader.create(mockedConfig, new HashMap<String,String>(), scheduler);
+        BlobStoreManagedLedgerOffloader offloader = BlobStoreManagedLedgerOffloader.create(mockedConfig, new HashMap<String,String>(), scheduler, this.offloaderStats);
         return offloader;
     }
 
@@ -162,6 +165,41 @@ public class BlobStoreManagedLedgerOffloaderTest extends BlobStoreManagedLedgerO
         Assert.assertEquals(toTest.getState(), BlobStoreBackedReadHandleImpl.State.Closed);
     }
 
+    @Test(timeOut = 600000)  // 10 minutes.
+    public void testOffloadAndReadMetrics() throws Exception {
+        ReadHandle toWrite = buildReadHandle(DEFAULT_BLOCK_SIZE, 3);
+        LedgerOffloader offloader = getOffloader();
+
+        UUID uuid = UUID.randomUUID();
+
+        String topic = "test";
+        Map<String, String> extraMap = new HashMap<>();
+        extraMap.put("ManagedLedgerName", topic);
+        offloader.offload(toWrite, uuid, extraMap).get();
+
+        LedgerOffloaderStatsImpl offloaderStats = (LedgerOffloaderStatsImpl) this.offloaderStats;
+
+        assertEquals(offloaderStats.getOffloadError(topic), 0);
+        assertTrue(offloaderStats.getOffloadBytes(topic) > 0 );
+        assertTrue(offloaderStats.getReadLedgerLatency(topic).count > 0);
+        assertEquals(offloaderStats.getWriteStorageError(topic), 0);
+
+        Map<String, String> map = new HashMap<>();
+        map.putAll(offloader.getOffloadDriverMetadata());
+        map.put("ManagedLedgerName", topic);
+        ReadHandle toTest = offloader.readOffloaded(toWrite.getId(), uuid, map).get();
+        LedgerEntries toTestEntries = toTest.read(0, toTest.getLastAddConfirmed());
+        Iterator<LedgerEntry> toTestIter = toTestEntries.iterator();
+        while (toTestIter.hasNext()) {
+            LedgerEntry toTestEntry = toTestIter.next();
+        }
+
+        assertEquals(offloaderStats.getReadOffloadError(topic), 0);
+        assertTrue(offloaderStats.getReadOffloadBytes(topic) > 0);
+        assertTrue(offloaderStats.getReadOffloadDataLatency(topic).count > 0);
+        assertTrue(offloaderStats.getReadOffloadIndexLatency(topic).count > 0);
+    }
+
     @Test
     public void testOffloadFailInitDataBlockUpload() throws Exception {
         ReadHandle readHandle = buildReadHandle();
@@ -497,7 +535,7 @@ public class BlobStoreManagedLedgerOffloaderTest extends BlobStoreManagedLedgerO
         try {
             toTest.readAsync(0, 0).get();
         } catch (Exception e) {
-            fail("Get unexpected exception when reading entries", e);
+            Assert.fail("Get unexpected exception when reading entries", e);
         }
     }