You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by sh...@apache.org on 2022/09/27 03:02:59 UTC

[bookkeeper] branch master updated: unified latency metric unit (#3501)

This is an automated email from the ASF dual-hosted git repository.

shoothzj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e387f8ef3 unified latency metric unit (#3501)
1e387f8ef3 is described below

commit 1e387f8ef3fdf2c9ba9efcff4f781ef9f17ad75d
Author: StevenLuMT <42...@users.noreply.github.com>
AuthorDate: Tue Sep 27 11:02:54 2022 +0800

    unified latency metric unit (#3501)
    
    Descriptions of the changes in this PR:
    
    ### Motivation
    
    The latency of the OpStatsLogger.registerSuccessfulEvent calculation is to convert the time to milliseconds
    <img width="855" alt="image" src="https://user-images.githubusercontent.com/42990025/191929953-bdac2d25-3fbe-4697-b3e9-f4942ed7c074.png">
    but Counter.add nothing to do
    
    so when using Counter for latency statistics, the time unit and OpStatsLogger are not unified, which is easy to be misleading.
    then we unified latency metric unit
    
    ### Changes
    
    1.  change name : Counter.add -->  Counter.addCount
    2. add new method Counter.addLatency to count the time and convert the time to milliseconds
    
    then how to use counter correctly:
    1. when using Counter for latency metric, call Counter.addLatency
    2. when using Counter for count metric, call Counter.addCount
---
 .../bookkeeper/common/stats/BroadCastStatsLogger.java | 13 ++++++++++---
 .../org/apache/bookkeeper/test/TestStatsProvider.java |  8 +++++++-
 .../java/org/apache/bookkeeper/bookie/BookieImpl.java |  4 ++--
 .../org/apache/bookkeeper/bookie/EntryMemTable.java   |  2 +-
 .../bookie/EntryMemTableWithParallelFlusher.java      |  2 +-
 .../bookkeeper/bookie/GarbageCollectorThread.java     |  4 ++--
 .../java/org/apache/bookkeeper/bookie/Journal.java    |  8 ++++----
 .../java/org/apache/bookkeeper/bookie/SyncThread.java |  4 ++--
 .../storage/ldb/SingleDirectoryDbLedgerStorage.java   | 19 ++++++++++++-------
 .../bookkeeper/server/service/ScrubberService.java    |  2 +-
 .../java/org/apache/bookkeeper/stats/Counter.java     | 12 +++++++++++-
 .../org/apache/bookkeeper/stats/NullStatsLogger.java  |  7 ++++++-
 .../stats/codahale/CodahaleStatsLogger.java           |  9 ++++++++-
 .../bookkeeper/stats/prometheus/LongAdderCounter.java | 13 ++++++++++++-
 .../prometheus/ThreadScopedLongAdderCounter.java      | 10 ++++++++--
 .../prometheus/TestPrometheusMetricsProvider.java     | 12 +++++++++++-
 .../common/stats/BroadCastStatsLogger.java            | 13 ++++++++++---
 .../org/apache/distributedlog/BKAsyncLogWriter.java   |  4 ++--
 18 files changed, 110 insertions(+), 36 deletions(-)

diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/stats/BroadCastStatsLogger.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/stats/BroadCastStatsLogger.java
index 86020c0fd2..26cb334957 100644
--- a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/stats/BroadCastStatsLogger.java
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/stats/BroadCastStatsLogger.java
@@ -127,9 +127,16 @@ public class BroadCastStatsLogger {
                 }
 
                 @Override
-                public void add(long l) {
-                    firstCounter.add(l);
-                    secondCounter.add(l);
+                public void addCount(long l) {
+                    firstCounter.addCount(l);
+                    secondCounter.addCount(l);
+                }
+
+                @Override
+                public void addLatency(long eventLatency, TimeUnit unit) {
+                    long valueMillis = unit.toMillis(eventLatency);
+                    firstCounter.addCount(valueMillis);
+                    secondCounter.addCount(valueMillis);
                 }
 
                 @Override
diff --git a/bookkeeper-common/src/test/java/org/apache/bookkeeper/test/TestStatsProvider.java b/bookkeeper-common/src/test/java/org/apache/bookkeeper/test/TestStatsProvider.java
index c2a06d6f41..d3299b077e 100644
--- a/bookkeeper-common/src/test/java/org/apache/bookkeeper/test/TestStatsProvider.java
+++ b/bookkeeper-common/src/test/java/org/apache/bookkeeper/test/TestStatsProvider.java
@@ -60,10 +60,16 @@ public class TestStatsProvider implements StatsProvider {
         }
 
         @Override
-        public void add(long delta) {
+        public void addCount(long delta) {
             updateMax(val.addAndGet(delta));
         }
 
+        @Override
+        public void addLatency(long eventLatency, TimeUnit unit) {
+            long valueMillis = unit.toMillis(eventLatency);
+            updateMax(val.addAndGet(valueMillis));
+        }
+
         @Override
         public Long get() {
             return val.get();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
index 404743a761..df11852235 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
@@ -931,7 +931,7 @@ public class BookieImpl extends BookieCriticalThread implements Bookie {
         long ledgerId = handle.getLedgerId();
         long entryId = handle.addEntry(entry);
 
-        bookieStats.getWriteBytes().add(entry.readableBytes());
+        bookieStats.getWriteBytes().addCount(entry.readableBytes());
 
         // journal `addEntry` should happen after the entry is added to ledger storage.
         // otherwise the journal entry can potentially be rolled before the ledger is created in ledger storage.
@@ -1110,7 +1110,7 @@ public class BookieImpl extends BookieCriticalThread implements Bookie {
             }
             ByteBuf entry = handle.readEntry(entryId);
             entrySize = entry.readableBytes();
-            bookieStats.getReadBytes().add(entrySize);
+            bookieStats.getReadBytes().addCount(entrySize);
             success = true;
             return entry;
         } finally {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java
index b868e46657..332b152a42 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java
@@ -258,7 +258,7 @@ public class EntryMemTable implements AutoCloseable{
                             }
                         }
                     }
-                    memTableStats.getFlushBytesCounter().add(size);
+                    memTableStats.getFlushBytesCounter().addCount(size);
                     clearSnapshot(keyValues);
                 }
             }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTableWithParallelFlusher.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTableWithParallelFlusher.java
index d092f2064c..42a7f19a65 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTableWithParallelFlusher.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTableWithParallelFlusher.java
@@ -139,7 +139,7 @@ class EntryMemTableWithParallelFlusher extends EntryMemTable {
                         throw new IOException("Failed to complete the flushSnapshotByParallelizing",
                                 exceptionWhileFlushingParallelly.get());
                     }
-                    memTableStats.getFlushBytesCounter().add(flushedSize.get());
+                    memTableStats.getFlushBytesCounter().addCount(flushedSize.get());
                     clearSnapshot(keyValues);
                 }
             }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
index c89acf9a05..e7549aaddb 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
@@ -489,7 +489,7 @@ public class GarbageCollectorThread extends SafeRunnable {
                     // We can remove this entry log file now.
                     LOG.info("Deleting entryLogId {} as it has no active ledgers!", entryLogId);
                     removeEntryLog(entryLogId);
-                    gcStats.getReclaimedSpaceViaDeletes().add(meta.getTotalSize());
+                    gcStats.getReclaimedSpaceViaDeletes().addCount(meta.getTotalSize());
                 } else if (modified) {
                     // update entryLogMetaMap only when the meta modified.
                     entryLogMetaMap.put(meta.getEntryLogId(), meta);
@@ -607,7 +607,7 @@ public class GarbageCollectorThread extends SafeRunnable {
 
                     long priorRemainingSize = meta.getRemainingSize();
                     compactEntryLog(meta);
-                    gcStats.getReclaimedSpaceViaCompaction().add(meta.getTotalSize() - priorRemainingSize);
+                    gcStats.getReclaimedSpaceViaCompaction().addCount(meta.getTotalSize() - priorRemainingSize);
                     compactedBuckets[bucketIndex]++;
                 });
             }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index 5b60eae2e8..69a0ac8566 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -346,7 +346,7 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
             journalAddEntryStats.registerSuccessfulEvent(MathUtils.elapsedNanos(enqueueTime), TimeUnit.NANOSECONDS);
             cb.writeComplete(0, ledgerId, entryId, null, ctx);
             recycle();
-            callbackTime.add(MathUtils.elapsedNanos(startTime));
+            callbackTime.addLatency(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
         }
 
         private final Handle<QueueEntry> recyclerHandle;
@@ -515,7 +515,7 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
             while (running) {
                 ForceWriteRequest req = null;
                 try {
-                    forceWriteThreadTime.add(MathUtils.elapsedNanos(busyStartTime));
+                    forceWriteThreadTime.addLatency(MathUtils.elapsedNanos(busyStartTime), TimeUnit.NANOSECONDS);
                     req = forceWriteRequests.take();
                     busyStartTime = System.nanoTime();
                     // Force write the file and then notify the write completions
@@ -1081,7 +1081,7 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
                     }
 
                     if (numEntriesToFlush == 0) {
-                        journalTime.add(MathUtils.elapsedNanos(busyStartTime));
+                        journalTime.addLatency(MathUtils.elapsedNanos(busyStartTime), TimeUnit.NANOSECONDS);
                         qe = queue.take();
                         dequeueStartTime = MathUtils.nowInNano();
                         busyStartTime = dequeueStartTime;
@@ -1230,7 +1230,7 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
                     qe.entry.release();
                 } else if (qe.entryId != BookieImpl.METAENTRY_ID_FORCE_LEDGER) {
                     int entrySize = qe.entry.readableBytes();
-                    journalStats.getJournalWriteBytes().add(entrySize);
+                    journalStats.getJournalWriteBytes().addCount(entrySize);
 
                     batchSize += (4 + entrySize);
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java
index b2fe868492..4ea6dd27e4 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java
@@ -111,7 +111,7 @@ class SyncThread implements Checkpointer {
                 log.error("Exception in SyncThread", t);
                 dirsListener.fatalError();
             } finally {
-                syncExecutorTime.add(MathUtils.elapsedNanos(startTime));
+                syncExecutorTime.addLatency(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
             }
         });
     }
@@ -124,7 +124,7 @@ class SyncThread implements Checkpointer {
             } catch (Throwable t) {
                 log.error("Exception flushing ledgers ", t);
             } finally {
-                syncExecutorTime.add(MathUtils.elapsedNanos(startTime));
+                syncExecutorTime.addLatency(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
             }
         });
     }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
index 0c5f328d2d..f450906de5 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
@@ -212,7 +212,7 @@ public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage
             ThreadRegistry.register(dbStoragerExecutorName, 0);
             // ensure the metric gets registered on start-up as this thread only executes
             // when the write cache is full which may not happen or not for a long time
-            flushExecutorTime.add(0);
+            flushExecutorTime.addLatency(0, TimeUnit.NANOSECONDS);
         });
 
         ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener());
@@ -463,7 +463,7 @@ public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage
                         } catch (IOException e) {
                             log.error("Error during flush", e);
                         } finally {
-                            flushExecutorTime.add(MathUtils.elapsedNanos(startTime));
+                            flushExecutorTime.addLatency(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
                         }
                     });
             }
@@ -570,14 +570,16 @@ public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage
                 throw new NoEntryException(ledgerId, entryId);
             }
         } finally {
-            dbLedgerStorageStats.getReadFromLocationIndexTime().add(MathUtils.elapsedNanos(locationIndexStartNano));
+            dbLedgerStorageStats.getReadFromLocationIndexTime().addLatency(
+                    MathUtils.elapsedNanos(locationIndexStartNano), TimeUnit.NANOSECONDS);
         }
 
         long readEntryStartNano = MathUtils.nowInNano();
         try {
             entry = entryLogger.readEntry(ledgerId, entryId, entryLocation);
         } finally {
-            dbLedgerStorageStats.getReadFromEntryLogTime().add(MathUtils.elapsedNanos(readEntryStartNano));
+            dbLedgerStorageStats.getReadFromEntryLogTime().addLatency(
+                    MathUtils.elapsedNanos(readEntryStartNano), TimeUnit.NANOSECONDS);
         }
 
         readCache.put(ledgerId, entryId, entry);
@@ -634,7 +636,8 @@ public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage
         } finally {
             dbLedgerStorageStats.getReadAheadBatchCountStats().registerSuccessfulValue(count);
             dbLedgerStorageStats.getReadAheadBatchSizeStats().registerSuccessfulValue(size);
-            dbLedgerStorageStats.getReadAheadTime().add(MathUtils.elapsedNanos(readAheadStartNano));
+            dbLedgerStorageStats.getReadAheadTime().addLatency(
+                    MathUtils.elapsedNanos(readAheadStartNano), TimeUnit.NANOSECONDS);
         }
     }
 
@@ -689,11 +692,13 @@ public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage
         }
 
         long entryLocation = entryLocationIndex.getLocation(ledgerId, lastEntryId);
-        dbLedgerStorageStats.getReadFromLocationIndexTime().add(MathUtils.elapsedNanos(locationIndexStartNano));
+        dbLedgerStorageStats.getReadFromLocationIndexTime().addLatency(
+                MathUtils.elapsedNanos(locationIndexStartNano), TimeUnit.NANOSECONDS);
 
         long readEntryStartNano = MathUtils.nowInNano();
         ByteBuf content = entryLogger.readEntry(ledgerId, lastEntryId, entryLocation);
-        dbLedgerStorageStats.getReadFromEntryLogTime().add(MathUtils.elapsedNanos(readEntryStartNano));
+        dbLedgerStorageStats.getReadFromEntryLogTime().addLatency(
+                MathUtils.elapsedNanos(readEntryStartNano), TimeUnit.NANOSECONDS);
         return content;
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/service/ScrubberService.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/service/ScrubberService.java
index 4c027a6140..68ebc9fa91 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/service/ScrubberService.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/service/ScrubberService.java
@@ -105,7 +105,7 @@ public class ScrubberService extends ServerLifecycleComponent {
         try {
             List<LedgerStorage.DetectedInconsistency> errors = ledgerStorage.localConsistencyCheck(scrubRateLimiter);
             if (errors.size() > 0) {
-                errorCounter.add(errors.size());
+                errorCounter.addCount(errors.size());
                 LOG.error("Found inconsistency during localConsistencyCheck:");
                 for (LedgerStorage.DetectedInconsistency error : errors) {
                     LOG.error("Ledger {}, entry {}: ", error.getLedgerId(), error.getEntryId(), error.getException());
diff --git a/stats/bookkeeper-stats-api/src/main/java/org/apache/bookkeeper/stats/Counter.java b/stats/bookkeeper-stats-api/src/main/java/org/apache/bookkeeper/stats/Counter.java
index 7160d7ccc1..8f70f44e69 100644
--- a/stats/bookkeeper-stats-api/src/main/java/org/apache/bookkeeper/stats/Counter.java
+++ b/stats/bookkeeper-stats-api/src/main/java/org/apache/bookkeeper/stats/Counter.java
@@ -16,6 +16,8 @@
  */
 package org.apache.bookkeeper.stats;
 
+import java.util.concurrent.TimeUnit;
+
 /**
  * Simple stats that require only increment and decrement
  * functions on a Long. Metrics like the number of topics, persist queue size
@@ -41,7 +43,15 @@ public interface Counter {
      * Add delta to the value associated with this stat.
      * @param delta
      */
-    void add(long delta);
+    void addCount(long delta);
+
+    /**
+     * An operation succeeded with the given eventLatency. Update
+     * stats to reflect the same
+     * @param eventLatency The event latency
+     * @param unit
+     */
+    void addLatency(long eventLatency, TimeUnit unit);
 
     /**
      * Get the value associated with this stat.
diff --git a/stats/bookkeeper-stats-api/src/main/java/org/apache/bookkeeper/stats/NullStatsLogger.java b/stats/bookkeeper-stats-api/src/main/java/org/apache/bookkeeper/stats/NullStatsLogger.java
index ba6be364d3..a130b76b67 100644
--- a/stats/bookkeeper-stats-api/src/main/java/org/apache/bookkeeper/stats/NullStatsLogger.java
+++ b/stats/bookkeeper-stats-api/src/main/java/org/apache/bookkeeper/stats/NullStatsLogger.java
@@ -88,7 +88,12 @@ public class NullStatsLogger implements StatsLogger {
         }
 
         @Override
-        public void add(long delta) {
+        public void addCount(long delta) {
+            // nop
+        }
+
+        @Override
+        public void addLatency(long eventLatency, TimeUnit unit) {
             // nop
         }
 
diff --git a/stats/bookkeeper-stats-providers/codahale-metrics-provider/src/main/java/org/apache/bookkeeper/stats/codahale/CodahaleStatsLogger.java b/stats/bookkeeper-stats-providers/codahale-metrics-provider/src/main/java/org/apache/bookkeeper/stats/codahale/CodahaleStatsLogger.java
index ade182d1c7..4851362efd 100644
--- a/stats/bookkeeper-stats-providers/codahale-metrics-provider/src/main/java/org/apache/bookkeeper/stats/codahale/CodahaleStatsLogger.java
+++ b/stats/bookkeeper-stats-providers/codahale-metrics-provider/src/main/java/org/apache/bookkeeper/stats/codahale/CodahaleStatsLogger.java
@@ -20,6 +20,7 @@ import static com.codahale.metrics.MetricRegistry.name;
 
 import com.codahale.metrics.MetricRegistry;
 import com.codahale.metrics.Timer;
+import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.Gauge;
 import org.apache.bookkeeper.stats.OpStatsLogger;
@@ -70,9 +71,15 @@ public class CodahaleStatsLogger implements StatsLogger {
             }
 
             @Override
-            public void add(long delta) {
+            public void addCount(long delta) {
                 c.inc(delta);
             }
+
+            @Override
+            public void addLatency(long eventLatency, TimeUnit unit) {
+                long valueMillis = unit.toMillis(eventLatency);
+                c.inc(valueMillis);
+            }
         };
     }
 
diff --git a/stats/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/LongAdderCounter.java b/stats/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/LongAdderCounter.java
index 9e4f28c54e..687931ef5d 100644
--- a/stats/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/LongAdderCounter.java
+++ b/stats/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/LongAdderCounter.java
@@ -17,6 +17,7 @@
 package org.apache.bookkeeper.stats.prometheus;
 
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.LongAdder;
 import org.apache.bookkeeper.stats.Counter;
 
@@ -54,10 +55,20 @@ public class LongAdderCounter implements Counter {
     }
 
     @Override
-    public void add(long delta) {
+    public void addCount(long delta) {
         counter.add(delta);
     }
 
+    /**
+     * When counter is used to count time.
+     * consistent with the {@link DataSketchesOpStatsLogger#registerSuccessfulEvent(long, TimeUnit)} 's logic
+     * */
+    @Override
+    public void addLatency(long eventLatency, TimeUnit unit) {
+        long valueMillis = unit.toMillis(eventLatency);
+        counter.add(valueMillis);
+    }
+
     @Override
     public Long get() {
         return counter.sum();
diff --git a/stats/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/ThreadScopedLongAdderCounter.java b/stats/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/ThreadScopedLongAdderCounter.java
index 5b0110da2b..ad717e55a5 100644
--- a/stats/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/ThreadScopedLongAdderCounter.java
+++ b/stats/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/ThreadScopedLongAdderCounter.java
@@ -18,6 +18,7 @@ package org.apache.bookkeeper.stats.prometheus;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.ThreadRegistry;
 
@@ -65,8 +66,13 @@ public class ThreadScopedLongAdderCounter implements Counter {
     }
 
     @Override
-    public void add(long delta) {
-        getCounter().add(delta);
+    public void addCount(long delta) {
+        getCounter().addCount(delta);
+    }
+
+    @Override
+    public void addLatency(long eventLatency, TimeUnit unit) {
+        getCounter().addLatency(eventLatency, unit);
     }
 
     @Override
diff --git a/stats/bookkeeper-stats-providers/prometheus-metrics-provider/src/test/java/org/apache/bookkeeper/stats/prometheus/TestPrometheusMetricsProvider.java b/stats/bookkeeper-stats-providers/prometheus-metrics-provider/src/test/java/org/apache/bookkeeper/stats/prometheus/TestPrometheusMetricsProvider.java
index 167b79295f..93084fa053 100644
--- a/stats/bookkeeper-stats-providers/prometheus-metrics-provider/src/test/java/org/apache/bookkeeper/stats/prometheus/TestPrometheusMetricsProvider.java
+++ b/stats/bookkeeper-stats-providers/prometheus-metrics-provider/src/test/java/org/apache/bookkeeper/stats/prometheus/TestPrometheusMetricsProvider.java
@@ -25,6 +25,7 @@ import java.io.StringWriter;
 import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.StatsLogger;
@@ -98,7 +99,16 @@ public class TestPrometheusMetricsProvider {
         assertEquals(1L, counter.get().longValue());
         counter.dec();
         assertEquals(0L, counter.get().longValue());
-        counter.add(3);
+        counter.addCount(3);
+        assertEquals(3L, counter.get().longValue());
+    }
+
+    @Test
+    public void testCounter2() {
+        LongAdderCounter counter = new LongAdderCounter(Collections.emptyMap());
+        long value = counter.get();
+        assertEquals(0L, value);
+        counter.addLatency(3 * 1000 * 1000L, TimeUnit.NANOSECONDS);
         assertEquals(3L, counter.get().longValue());
     }
 
diff --git a/stream/distributedlog/common/src/main/java/org/apache/distributedlog/common/stats/BroadCastStatsLogger.java b/stream/distributedlog/common/src/main/java/org/apache/distributedlog/common/stats/BroadCastStatsLogger.java
index 5c9d035d48..55bf6dd508 100644
--- a/stream/distributedlog/common/src/main/java/org/apache/distributedlog/common/stats/BroadCastStatsLogger.java
+++ b/stream/distributedlog/common/src/main/java/org/apache/distributedlog/common/stats/BroadCastStatsLogger.java
@@ -127,9 +127,16 @@ public class BroadCastStatsLogger {
                 }
 
                 @Override
-                public void add(long l) {
-                    firstCounter.add(l);
-                    secondCounter.add(l);
+                public void addCount(long l) {
+                    firstCounter.addCount(l);
+                    secondCounter.addCount(l);
+                }
+
+                @Override
+                public void addLatency(long eventLatency, TimeUnit unit) {
+                    long valueMillis = unit.toMillis(eventLatency);
+                    firstCounter.addCount(valueMillis);
+                    secondCounter.addCount(valueMillis);
                 }
 
                 @Override
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKAsyncLogWriter.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKAsyncLogWriter.java
index 780107ed84..819a43c15a 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKAsyncLogWriter.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKAsyncLogWriter.java
@@ -367,7 +367,7 @@ class BKAsyncLogWriter extends BKAbstractLogWriter implements AsyncLogWriter {
                             FutureUtils.complete(rollingFuture, writer);
                         }
                         rollingFuture = null;
-                        pendingRequestDispatch.add(pendingRequests.size());
+                        pendingRequestDispatch.addCount(pendingRequests.size());
                         pendingRequests = null;
                     }
                 } catch (IOException ioe) {
@@ -395,7 +395,7 @@ class BKAsyncLogWriter extends BKAbstractLogWriter implements AsyncLogWriter {
             rollingFuture = null;
         }
 
-        pendingRequestDispatch.add(pendingRequestsSnapshot.size());
+        pendingRequestDispatch.addCount(pendingRequestsSnapshot.size());
 
         // After erroring out the writer above, no more requests
         // will be enqueued to pendingRequests