You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/11/29 02:41:38 UTC

[bookkeeper] branch master updated: [STATS] Add @StatsDoc annotation for bookie server request stats

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 8979e9e  [STATS] Add @StatsDoc annotation for bookie server request stats
8979e9e is described below

commit 8979e9e0dc7f1c7e6bae2636c508ca529593e17a
Author: Sijie Guo <gu...@gmail.com>
AuthorDate: Wed Nov 28 18:41:34 2018 -0800

    [STATS] Add @StatsDoc annotation for bookie server request stats
    
    
    
    Descriptions of the changes in this PR:
    
    *Motivation*
    
    As part of [BP-36](https://github.com/apache/bookkeeper/issues/1785), this PR is to document the request stats
    for bookkeeper server.
    
    *Changes*
    
    - add `parent` and `happensAfter` in StatsDoc
    - convert bookie request stats to use StatsDoc for documenting metrics
    
    Master Issue: #1785
    
    
    
    
    Reviewers: Jia Zhai <None>
    
    This closes #1839 from sijie/bp36_add_parent_and_happensafter
---
 .../bookkeeper/bookie/stats/BookieStats.java       |  14 +-
 .../bookkeeper/proto/BookieRequestProcessor.java   | 185 ++---------
 .../bookkeeper/proto/ForceLedgerProcessorV3.java   |  11 +-
 .../bookkeeper/proto/GetBookieInfoProcessorV3.java |  10 +-
 .../proto/LongPollReadEntryProcessorV3.java        |   6 +-
 .../bookkeeper/proto/PacketProcessorBase.java      |   2 +-
 .../bookkeeper/proto/PacketProcessorBaseV3.java    |   6 +-
 .../bookkeeper/proto/ReadEntryProcessor.java       |  13 +-
 .../bookkeeper/proto/ReadEntryProcessorV3.java     |  20 +-
 .../bookkeeper/proto/ReadLacProcessorV3.java       |  10 +-
 .../org/apache/bookkeeper/proto/RequestStats.java  | 342 +++++++++++++++++++++
 .../bookkeeper/proto/WriteEntryProcessor.java      |  18 +-
 .../bookkeeper/proto/WriteEntryProcessorV3.java    |  12 +-
 .../bookkeeper/proto/WriteLacProcessorV3.java      |  19 +-
 .../proto/ForceLedgerProcessorV3Test.java          |   5 +-
 .../bookkeeper/proto/WriteEntryProcessorTest.java  |   5 +-
 .../proto/WriteEntryProcessorV3Test.java           |   7 +-
 .../bookkeeper/stats/annotations/StatsDoc.java     |  20 ++
 18 files changed, 470 insertions(+), 235 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/BookieStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/BookieStats.java
index 72921d7..5e033e9 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/BookieStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/BookieStats.java
@@ -19,6 +19,7 @@
 
 package org.apache.bookkeeper.bookie.stats;
 
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_ADD_ENTRY;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_ADD_ENTRY_BYTES;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_FORCE_LEDGER;
@@ -28,6 +29,7 @@ import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_RECOVERY
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_SCOPE;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.CATEGORY_SERVER;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_BYTES;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.WRITE_BYTES;
 
 import lombok.Getter;
@@ -56,11 +58,19 @@ public class BookieStats {
     @StatsDoc(name = BOOKIE_FORCE_LEDGER, help = "total force operations occurred on a bookie")
     private final Counter forceLedgerOps;
     // Bookie Operation Latency Stats
-    @StatsDoc(name = BOOKIE_ADD_ENTRY, help = "operations stats of AddEntry on a bookie")
+    @StatsDoc(
+        name = BOOKIE_ADD_ENTRY,
+        help = "operations stats of AddEntry on a bookie",
+        parent = ADD_ENTRY
+    )
     private final OpStatsLogger addEntryStats;
     @StatsDoc(name = BOOKIE_RECOVERY_ADD_ENTRY, help = "operation stats of RecoveryAddEntry on a bookie")
     private final OpStatsLogger recoveryAddEntryStats;
-    @StatsDoc(name = BOOKIE_READ_ENTRY, help = "operation stats of ReadEntry on a bookie")
+    @StatsDoc(
+        name = BOOKIE_READ_ENTRY,
+        help = "operation stats of ReadEntry on a bookie",
+        parent = READ_ENTRY
+    )
     private final OpStatsLogger readEntryStats;
     // Bookie Operation Bytes Stats
     @StatsDoc(name = BOOKIE_ADD_ENTRY_BYTES, help = "bytes stats of AddEntry on a bookie")
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
index 78b35ec..b883f74 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
@@ -21,34 +21,6 @@
 package org.apache.bookkeeper.proto;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY_BLOCKED;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY_BLOCKED_WAIT;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY_IN_PROGRESS;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY_REQUEST;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.CHANNEL_WRITE;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.FORCE_LEDGER;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.FORCE_LEDGER_REQUEST;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.GET_BOOKIE_INFO;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.GET_BOOKIE_INFO_REQUEST;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_BLOCKED;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_BLOCKED_WAIT;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_FENCE_READ;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_FENCE_REQUEST;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_FENCE_WAIT;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_IN_PROGRESS;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_LONG_POLL_PRE_WAIT;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_LONG_POLL_READ;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_LONG_POLL_REQUEST;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_LONG_POLL_WAIT;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_REQUEST;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_SCHEDULING_DELAY;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_LAC;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_LAC_REQUEST;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_LAST_ENTRY_NOENTRY_ERROR;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.WRITE_LAC;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.WRITE_LAC_REQUEST;
 import static org.apache.bookkeeper.proto.RequestUtils.hasFlag;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -68,7 +40,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 
 import lombok.AccessLevel;
@@ -80,9 +51,6 @@ import org.apache.bookkeeper.common.util.MathUtils;
 import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.processor.RequestProcessor;
-import org.apache.bookkeeper.stats.Counter;
-import org.apache.bookkeeper.stats.Gauge;
-import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.tls.SecurityException;
 import org.apache.bookkeeper.tls.SecurityHandlerFactory;
@@ -146,37 +114,8 @@ public class BookieRequestProcessor implements RequestProcessor {
     // Expose Stats
     private final BKStats bkStats = BKStats.getInstance();
     private final boolean statsEnabled;
-    private final OpStatsLogger addRequestStats;
-    private final OpStatsLogger addEntryStats;
-    final OpStatsLogger readRequestStats;
-    final OpStatsLogger readEntryStats;
-    final OpStatsLogger forceLedgerStats;
-    final OpStatsLogger forceLedgerRequestStats;
-    final OpStatsLogger fenceReadRequestStats;
-    final OpStatsLogger fenceReadEntryStats;
-    final OpStatsLogger fenceReadWaitStats;
-    final OpStatsLogger readEntrySchedulingDelayStats;
-    final OpStatsLogger longPollPreWaitStats;
-    final OpStatsLogger longPollWaitStats;
-    final OpStatsLogger longPollReadStats;
-    final OpStatsLogger longPollReadRequestStats;
-    final Counter readLastEntryNoEntryErrorCounter;
-    final OpStatsLogger writeLacRequestStats;
-    final OpStatsLogger writeLacStats;
-    final OpStatsLogger readLacRequestStats;
-    final OpStatsLogger readLacStats;
-    final OpStatsLogger getBookieInfoRequestStats;
-    final OpStatsLogger getBookieInfoStats;
-    final OpStatsLogger channelWriteStats;
-    final OpStatsLogger addEntryBlockedStats;
-    final OpStatsLogger readEntryBlockedStats;
-
-    final AtomicInteger addsInProgress = new AtomicInteger(0);
-    final AtomicInteger maxAddsInProgress = new AtomicInteger(0);
-    final AtomicInteger addsBlocked = new AtomicInteger(0);
-    final AtomicInteger readsInProgress = new AtomicInteger(0);
-    final AtomicInteger readsBlocked = new AtomicInteger(0);
-    final AtomicInteger maxReadsInProgress = new AtomicInteger(0);
+
+    private final RequestStats requestStats;
 
     final Semaphore addsSemaphore;
     final Semaphore readsSemaphore;
@@ -248,86 +187,13 @@ public class BookieRequestProcessor implements RequestProcessor {
 
         // Expose Stats
         this.statsEnabled = serverCfg.isStatisticsEnabled();
-        this.addEntryStats = statsLogger.getOpStatsLogger(ADD_ENTRY);
-        this.addRequestStats = statsLogger.getOpStatsLogger(ADD_ENTRY_REQUEST);
-        this.readEntryStats = statsLogger.getOpStatsLogger(READ_ENTRY);
-        this.forceLedgerStats = statsLogger.getOpStatsLogger(FORCE_LEDGER);
-        this.forceLedgerRequestStats = statsLogger.getOpStatsLogger(FORCE_LEDGER_REQUEST);
-        this.readRequestStats = statsLogger.getOpStatsLogger(READ_ENTRY_REQUEST);
-        this.fenceReadEntryStats = statsLogger.getOpStatsLogger(READ_ENTRY_FENCE_READ);
-        this.fenceReadRequestStats = statsLogger.getOpStatsLogger(READ_ENTRY_FENCE_REQUEST);
-        this.fenceReadWaitStats = statsLogger.getOpStatsLogger(READ_ENTRY_FENCE_WAIT);
-        this.readEntrySchedulingDelayStats = statsLogger.getOpStatsLogger(READ_ENTRY_SCHEDULING_DELAY);
-        this.longPollPreWaitStats = statsLogger.getOpStatsLogger(READ_ENTRY_LONG_POLL_PRE_WAIT);
-        this.longPollWaitStats = statsLogger.getOpStatsLogger(READ_ENTRY_LONG_POLL_WAIT);
-        this.longPollReadStats = statsLogger.getOpStatsLogger(READ_ENTRY_LONG_POLL_READ);
-        this.longPollReadRequestStats = statsLogger.getOpStatsLogger(READ_ENTRY_LONG_POLL_REQUEST);
-        this.readLastEntryNoEntryErrorCounter = statsLogger.getCounter(READ_LAST_ENTRY_NOENTRY_ERROR);
-        this.writeLacStats = statsLogger.getOpStatsLogger(WRITE_LAC);
-        this.writeLacRequestStats = statsLogger.getOpStatsLogger(WRITE_LAC_REQUEST);
-        this.readLacStats = statsLogger.getOpStatsLogger(READ_LAC);
-        this.readLacRequestStats = statsLogger.getOpStatsLogger(READ_LAC_REQUEST);
-        this.getBookieInfoStats = statsLogger.getOpStatsLogger(GET_BOOKIE_INFO);
-        this.getBookieInfoRequestStats = statsLogger.getOpStatsLogger(GET_BOOKIE_INFO_REQUEST);
-        this.channelWriteStats = statsLogger.getOpStatsLogger(CHANNEL_WRITE);
-
-        this.addEntryBlockedStats = statsLogger.getOpStatsLogger(ADD_ENTRY_BLOCKED_WAIT);
-        this.readEntryBlockedStats = statsLogger.getOpStatsLogger(READ_ENTRY_BLOCKED_WAIT);
+        this.requestStats = new RequestStats(statsLogger);
 
         int maxAdds = serverCfg.getMaxAddsInProgressLimit();
         addsSemaphore = maxAdds > 0 ? new Semaphore(maxAdds, true) : null;
 
         int maxReads = serverCfg.getMaxReadsInProgressLimit();
         readsSemaphore = maxReads > 0 ? new Semaphore(maxReads, true) : null;
-
-        statsLogger.registerGauge(ADD_ENTRY_IN_PROGRESS, new Gauge<Number>() {
-            @Override
-            public Number getDefaultValue() {
-                return 0;
-            }
-
-            @Override
-            public Number getSample() {
-                return addsInProgress;
-            }
-        });
-
-        statsLogger.registerGauge(ADD_ENTRY_BLOCKED, new Gauge<Number>() {
-            @Override
-            public Number getDefaultValue() {
-                return 0;
-            }
-
-            @Override
-            public Number getSample() {
-                return addsBlocked;
-            }
-        });
-
-        statsLogger.registerGauge(READ_ENTRY_IN_PROGRESS, new Gauge<Number>() {
-            @Override
-            public Number getDefaultValue() {
-                return 0;
-            }
-
-            @Override
-            public Number getSample() {
-                return readsInProgress;
-            }
-        });
-
-        statsLogger.registerGauge(READ_ENTRY_BLOCKED, new Gauge<Number>() {
-            @Override
-            public Number getDefaultValue() {
-                return 0;
-            }
-
-            @Override
-            public Number getSample() {
-                return readsBlocked;
-            }
-        });
-
     }
 
     protected void onAddRequestStart(Channel channel) {
@@ -336,21 +202,19 @@ public class BookieRequestProcessor implements RequestProcessor {
                 final long throttlingStartTimeNanos = MathUtils.nowInNano();
                 channel.config().setAutoRead(false);
                 LOG.info("Too many add requests in progress, disabling autoread on channel {}", channel);
-                addsBlocked.incrementAndGet();
+                requestStats.blockAddRequest();
                 addsSemaphore.acquireUninterruptibly();
                 channel.config().setAutoRead(true);
                 final long delayNanos = MathUtils.elapsedNanos(throttlingStartTimeNanos);
                 LOG.info("Re-enabled autoread on channel {} after AddRequest delay of {} nanos", channel, delayNanos);
-                addEntryBlockedStats.registerSuccessfulEvent(delayNanos, TimeUnit.NANOSECONDS);
-                addsBlocked.decrementAndGet();
+                requestStats.unblockAddRequest(delayNanos);
             }
         }
-        final int curr = addsInProgress.incrementAndGet();
-        maxAddsInProgress.accumulateAndGet(curr, Integer::max);
+        requestStats.trackAddRequest();
     }
 
     protected void onAddRequestFinish() {
-        addsInProgress.decrementAndGet();
+        requestStats.untrackAddRequest();
         if (addsSemaphore != null) {
             addsSemaphore.release();
         }
@@ -362,21 +226,19 @@ public class BookieRequestProcessor implements RequestProcessor {
                 final long throttlingStartTimeNanos = MathUtils.nowInNano();
                 channel.config().setAutoRead(false);
                 LOG.info("Too many read requests in progress, disabling autoread on channel {}", channel);
-                readsBlocked.incrementAndGet();
+                requestStats.blockReadRequest();
                 readsSemaphore.acquireUninterruptibly();
                 channel.config().setAutoRead(true);
                 final long delayNanos = MathUtils.elapsedNanos(throttlingStartTimeNanos);
                 LOG.info("Re-enabled autoread on channel {} after ReadRequest delay of {} nanos", channel, delayNanos);
-                readEntryBlockedStats.registerSuccessfulEvent(delayNanos, TimeUnit.NANOSECONDS);
-                readsBlocked.decrementAndGet();
+                requestStats.unblockReadRequest(delayNanos);
             }
         }
-        final int curr = readsInProgress.incrementAndGet();
-        maxReadsInProgress.accumulateAndGet(curr, Integer::max);
+        requestStats.trackReadRequest();
     }
 
     protected void onReadRequestFinish() {
-        readsInProgress.decrementAndGet();
+        requestStats.untrackReadRequest();
         if (readsSemaphore != null) {
             readsSemaphore.release();
         }
@@ -384,12 +246,12 @@ public class BookieRequestProcessor implements RequestProcessor {
 
     @VisibleForTesting
     int maxAddsInProgressCount() {
-        return maxAddsInProgress.get();
+        return requestStats.maxAddsInProgressCount();
     }
 
     @VisibleForTesting
     int maxReadsInProgressCount() {
-        return maxReadsInProgress.get();
+        return requestStats.maxReadsInProgressCount();
     }
 
     @Override
@@ -576,7 +438,7 @@ public class BookieRequestProcessor implements RequestProcessor {
                         .setStatus(addResponse.getStatus())
                         .setAddResponse(addResponse);
                 BookkeeperProtocol.Response resp = response.build();
-                write.sendResponse(addResponse.getStatus(), resp, addRequestStats);
+                write.sendResponse(addResponse.getStatus(), resp, requestStats.getAddRequestStats());
             }
         }
     }
@@ -610,7 +472,10 @@ public class BookieRequestProcessor implements RequestProcessor {
                         .setStatus(forceLedgerResponse.getStatus())
                         .setForceLedgerResponse(forceLedgerResponse);
                 BookkeeperProtocol.Response resp = response.build();
-                forceLedger.sendResponse(forceLedgerResponse.getStatus(), resp, forceLedgerRequestStats);
+                forceLedger.sendResponse(
+                    forceLedgerResponse.getStatus(),
+                    resp,
+                    requestStats.getForceLedgerRequestStats());
             }
         }
     }
@@ -660,7 +525,7 @@ public class BookieRequestProcessor implements RequestProcessor {
                     .setStatus(readResponse.getStatus())
                     .setReadResponse(readResponse);
                 BookkeeperProtocol.Response resp = response.build();
-                read.sendResponse(readResponse.getStatus(), resp, readRequestStats);
+                read.sendResponse(readResponse.getStatus(), resp, requestStats.getReadRequestStats());
             }
         }
     }
@@ -740,8 +605,10 @@ public class BookieRequestProcessor implements RequestProcessor {
                             r.entryId);
                 }
 
-                write.sendResponse(BookieProtocol.ETOOMANYREQUESTS,
-                        ResponseBuilder.buildErrorResponse(BookieProtocol.ETOOMANYREQUESTS, r), addRequestStats);
+                write.sendResponse(
+                    BookieProtocol.ETOOMANYREQUESTS,
+                    ResponseBuilder.buildErrorResponse(BookieProtocol.ETOOMANYREQUESTS, r),
+                    requestStats.getAddRequestStats());
             }
         }
     }
@@ -770,8 +637,10 @@ public class BookieRequestProcessor implements RequestProcessor {
                             r.entryId);
                 }
 
-                read.sendResponse(BookieProtocol.ETOOMANYREQUESTS,
-                        ResponseBuilder.buildErrorResponse(BookieProtocol.ETOOMANYREQUESTS, r), readRequestStats);
+                read.sendResponse(
+                    BookieProtocol.ETOOMANYREQUESTS,
+                    ResponseBuilder.buildErrorResponse(BookieProtocol.ETOOMANYREQUESTS, r),
+                    requestStats.getReadRequestStats());
             }
         }
     }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3.java
index 0c8ef01..f889172 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3.java
@@ -67,11 +67,11 @@ class ForceLedgerProcessorV3 extends PacketProcessorBaseV3 implements Runnable {
                     "ledgerId must be {} but was {}", ledgerId, ledgerId1);
 
             if (BookieProtocol.EOK == rc) {
-                requestProcessor.getForceLedgerStats()
+                requestProcessor.getRequestStats().getForceLedgerStats()
                         .registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos),
                                 TimeUnit.NANOSECONDS);
             } else {
-                requestProcessor.getForceLedgerStats()
+                requestProcessor.getRequestStats().getForceLedgerStats()
                         .registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos),
                                 TimeUnit.NANOSECONDS);
             }
@@ -94,7 +94,7 @@ class ForceLedgerProcessorV3 extends PacketProcessorBaseV3 implements Runnable {
                     .setStatus(forceLedgerResponse.getStatus())
                     .setForceLedgerResponse(forceLedgerResponse);
             Response resp = response.build();
-            sendResponse(status, resp, requestProcessor.getForceLedgerRequestStats());
+            sendResponse(status, resp, requestProcessor.getRequestStats().getForceLedgerRequestStats());
         };
         StatusCode status = null;
         try {
@@ -124,7 +124,10 @@ class ForceLedgerProcessorV3 extends PacketProcessorBaseV3 implements Runnable {
                     .setStatus(forceLedgerResponse.getStatus())
                     .setForceLedgerResponse(forceLedgerResponse);
             Response resp = response.build();
-            sendResponse(forceLedgerResponse.getStatus(), resp, requestProcessor.getForceLedgerRequestStats());
+            sendResponse(
+                forceLedgerResponse.getStatus(),
+                resp,
+                requestProcessor.getRequestStats().getForceLedgerRequestStats());
         }
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java
index d964957..fe315f6 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java
@@ -54,8 +54,8 @@ public class GetBookieInfoProcessorV3 extends PacketProcessorBaseV3 implements R
 
         if (!isVersionCompatible()) {
             getBookieInfoResponse.setStatus(StatusCode.EBADVERSION);
-            requestProcessor.getGetBookieInfoStats().registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos),
-                    TimeUnit.NANOSECONDS);
+            requestProcessor.getRequestStats().getGetBookieInfoStats()
+                .registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
             return getBookieInfoResponse.build();
         }
 
@@ -80,8 +80,8 @@ public class GetBookieInfoProcessorV3 extends PacketProcessorBaseV3 implements R
         }
 
         getBookieInfoResponse.setStatus(status);
-        requestProcessor.getGetBookieInfoStats().registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos),
-                TimeUnit.NANOSECONDS);
+        requestProcessor.getRequestStats().getGetBookieInfoStats()
+            .registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
         return getBookieInfoResponse.build();
     }
 
@@ -98,6 +98,6 @@ public class GetBookieInfoProcessorV3 extends PacketProcessorBaseV3 implements R
                 .setGetBookieInfoResponse(getBookieInfoResponse);
         sendResponse(response.getStatus(),
                      response.build(),
-                     requestProcessor.getGetBookieInfoRequestStats());
+                     requestProcessor.getRequestStats().getGetBookieInfoRequestStats());
     }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java
index fdbbd35..6f25d68 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java
@@ -101,7 +101,7 @@ class LongPollReadEntryProcessorV3 extends ReadEntryProcessorV3 implements Watch
                     try {
                         return super.readEntry(readResponseBuilder, entryId, true, startTimeSw);
                     } catch (Bookie.NoEntryException e) {
-                        requestProcessor.readLastEntryNoEntryErrorCounter.inc();
+                        requestProcessor.getRequestStats().getReadLastEntryNoEntryErrorCounter().inc();
                         logger.info(
                                 "No entry found while piggyback reading entry {} from ledger {} : previous lac = {}",
                                 entryId, ledgerId, previousLAC);
@@ -153,7 +153,7 @@ class LongPollReadEntryProcessorV3 extends ReadEntryProcessorV3 implements Watch
                 return buildErrorResponse(StatusCode.EIO, startTimeSw);
             }
 
-            registerSuccessfulEvent(requestProcessor.longPollPreWaitStats, startTimeSw);
+            registerSuccessfulEvent(requestProcessor.getRequestStats().getLongPollPreWaitStats(), startTimeSw);
             lastPhaseStartTime.reset().start();
 
             if (watched) {
@@ -213,7 +213,7 @@ class LongPollReadEntryProcessorV3 extends ReadEntryProcessorV3 implements Watch
                 expirationTimerTask.cancel();
             }
 
-            registerEvent(timeout, requestProcessor.longPollWaitStats, lastPhaseStartTime);
+            registerEvent(timeout, requestProcessor.getRequestStats().getLongPollWaitStats(), lastPhaseStartTime);
             lastPhaseStartTime.reset().start();
         }
     }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
index b7dee2d..54368a0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
@@ -79,7 +79,7 @@ abstract class PacketProcessorBase<T extends Request> extends SafeRunnable {
         if (!isVersionCompatible()) {
             sendResponse(BookieProtocol.EBADVERSION,
                          ResponseBuilder.buildErrorResponse(BookieProtocol.EBADVERSION, request),
-                         requestProcessor.readRequestStats);
+                         requestProcessor.getRequestStats().getReadRequestStats());
             return;
         }
         processPacket();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java
index 7dc29a3..15765a2 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java
@@ -79,7 +79,7 @@ public abstract class PacketProcessorBaseV3 extends SafeRunnable {
             if (!channel.isWritable()) {
                 LOGGER.warn("cannot write response to non-writable channel {} for request {}", channel,
                         StringUtils.requestToString(request));
-                requestProcessor.getChannelWriteStats()
+                requestProcessor.getRequestStats().getChannelWriteStats()
                         .registerFailedEvent(MathUtils.elapsedNanos(writeNanos), TimeUnit.NANOSECONDS);
                 statsLogger.registerFailedEvent(MathUtils.elapsedNanos(enqueueNanos), TimeUnit.NANOSECONDS);
                 return;
@@ -93,10 +93,10 @@ public abstract class PacketProcessorBaseV3 extends SafeRunnable {
             public void operationComplete(ChannelFuture future) throws Exception {
                 long writeElapsedNanos = MathUtils.elapsedNanos(writeNanos);
                 if (!future.isSuccess()) {
-                    requestProcessor.getChannelWriteStats()
+                    requestProcessor.getRequestStats().getChannelWriteStats()
                         .registerFailedEvent(writeElapsedNanos, TimeUnit.NANOSECONDS);
                 } else {
-                    requestProcessor.getChannelWriteStats()
+                    requestProcessor.getRequestStats().getChannelWriteStats()
                         .registerSuccessfulEvent(writeElapsedNanos, TimeUnit.NANOSECONDS);
                 }
                 if (StatusCode.EOK == code) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
index edeb8a6..6566c7b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
@@ -20,7 +20,6 @@ package org.apache.bookkeeper.proto;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.Channel;
 import io.netty.util.Recycler;
-import io.netty.util.Recycler.Handle;
 import io.netty.util.ReferenceCountUtil;
 
 import java.io.IOException;
@@ -129,17 +128,17 @@ class ReadEntryProcessor extends PacketProcessorBase<ReadRequest> {
             LOG.trace("Read entry rc = {} for {}", errorCode, request);
         }
         if (errorCode == BookieProtocol.EOK) {
-            requestProcessor.readEntryStats.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos),
-                    TimeUnit.NANOSECONDS);
+            requestProcessor.getRequestStats().getReadEntryStats()
+                .registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
             sendResponse(errorCode, ResponseBuilder.buildReadResponse(data, request),
-                         requestProcessor.readRequestStats);
+                         requestProcessor.getRequestStats().getReadRequestStats());
         } else {
             ReferenceCountUtil.release(data);
 
-            requestProcessor.readEntryStats.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos),
-                    TimeUnit.NANOSECONDS);
+            requestProcessor.getRequestStats().getReadEntryStats()
+                .registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
             sendResponse(errorCode, ResponseBuilder.buildErrorResponse(errorCode, request),
-                         requestProcessor.readRequestStats);
+                         requestProcessor.getRequestStats().getReadRequestStats());
         }
         recycle();
     }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
index e7e5653..88b7662 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
@@ -71,14 +71,14 @@ class ReadEntryProcessorV3 extends PacketProcessorBaseV3 {
         this.ledgerId = readRequest.getLedgerId();
         this.entryId = readRequest.getEntryId();
         if (RequestUtils.isFenceRequest(this.readRequest)) {
-            this.readStats = requestProcessor.fenceReadEntryStats;
-            this.reqStats = requestProcessor.fenceReadRequestStats;
+            this.readStats = requestProcessor.getRequestStats().getFenceReadEntryStats();
+            this.reqStats = requestProcessor.getRequestStats().getFenceReadRequestStats();
         } else if (readRequest.hasPreviousLAC()) {
-            this.readStats = requestProcessor.longPollReadStats;
-            this.reqStats = requestProcessor.longPollReadRequestStats;
+            this.readStats = requestProcessor.getRequestStats().getLongPollReadStats();
+            this.reqStats = requestProcessor.getRequestStats().getLongPollReadRequestStats();
         } else {
-            this.readStats = requestProcessor.readEntryStats;
-            this.reqStats = requestProcessor.readRequestStats;
+            this.readStats = requestProcessor.getRequestStats().getReadEntryStats();
+            this.reqStats = requestProcessor.getRequestStats().getReadRequestStats();
         }
 
         this.fenceThreadPool = fenceThreadPool;
@@ -246,7 +246,7 @@ class ReadEntryProcessorV3 extends PacketProcessorBaseV3 {
 
     @Override
     public void safeRun() {
-        requestProcessor.readEntrySchedulingDelayStats.registerSuccessfulEvent(
+        requestProcessor.getRequestStats().getReadEntrySchedulingDelayStats().registerSuccessfulEvent(
             MathUtils.elapsedNanos(enqueueNanos), TimeUnit.NANOSECONDS);
 
         if (!isVersionCompatible()) {
@@ -275,11 +275,11 @@ class ReadEntryProcessorV3 extends PacketProcessorBaseV3 {
         StatusCode status;
         if (!fenceResult) {
             status = StatusCode.EIO;
-            registerFailedEvent(requestProcessor.fenceReadWaitStats, lastPhaseStartTime);
+            registerFailedEvent(requestProcessor.getRequestStats().getFenceReadWaitStats(), lastPhaseStartTime);
         } else {
             status = StatusCode.EOK;
             readResponse.setBody(ByteString.copyFrom(entryBody.nioBuffer()));
-            registerSuccessfulEvent(requestProcessor.fenceReadWaitStats, lastPhaseStartTime);
+            registerSuccessfulEvent(requestProcessor.getRequestStats().getFenceReadWaitStats(), lastPhaseStartTime);
         }
 
         if (null != entryBody) {
@@ -296,7 +296,7 @@ class ReadEntryProcessorV3 extends PacketProcessorBaseV3 {
         // build the fence read response
         getFenceResponse(readResponse, entryBody, fenceResult);
         // register fence read stat
-        registerEvent(!fenceResult, requestProcessor.fenceReadEntryStats, startTimeSw);
+        registerEvent(!fenceResult, requestProcessor.getRequestStats().getFenceReadEntryStats(), startTimeSw);
         // send the fence read response
         sendResponse(readResponse.build());
     }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java
index 898ddb0..a3bc311 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java
@@ -102,11 +102,11 @@ class ReadLacProcessorV3 extends PacketProcessorBaseV3 implements Runnable {
         }
 
         if (status == StatusCode.EOK) {
-            requestProcessor.readLacStats.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos),
-                    TimeUnit.NANOSECONDS);
+            requestProcessor.getRequestStats().getReadLacStats()
+                .registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
         } else {
-            requestProcessor.readLacStats.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos),
-                    TimeUnit.NANOSECONDS);
+            requestProcessor.getRequestStats().getReadLacStats()
+                .registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
         }
         // Finally set the status and return
         readLacResponse.setStatus(status);
@@ -126,6 +126,6 @@ class ReadLacProcessorV3 extends PacketProcessorBaseV3 implements Runnable {
             .setReadLacResponse(readLacResponse);
         sendResponse(response.getStatus(),
                 response.build(),
-                requestProcessor.readLacRequestStats);
+                requestProcessor.getRequestStats().getReadLacRequestStats());
     }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/RequestStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/RequestStats.java
new file mode 100644
index 0000000..1799e66
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/RequestStats.java
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.proto;
+
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY_BLOCKED;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY_BLOCKED_WAIT;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY_IN_PROGRESS;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY_REQUEST;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.CATEGORY_SERVER;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.CHANNEL_WRITE;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.FORCE_LEDGER;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.FORCE_LEDGER_REQUEST;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.GET_BOOKIE_INFO;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.GET_BOOKIE_INFO_REQUEST;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_BLOCKED;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_BLOCKED_WAIT;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_FENCE_READ;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_FENCE_REQUEST;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_FENCE_WAIT;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_IN_PROGRESS;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_LONG_POLL_PRE_WAIT;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_LONG_POLL_READ;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_LONG_POLL_REQUEST;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_LONG_POLL_WAIT;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_REQUEST;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_SCHEDULING_DELAY;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_LAC;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_LAC_REQUEST;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_LAST_ENTRY_NOENTRY_ERROR;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.SERVER_SCOPE;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.WRITE_LAC;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.WRITE_LAC_REQUEST;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.Getter;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.Gauge;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stats.annotations.StatsDoc;
+
+/**
+ * A umbrella class for request related stats.
+ */
+@StatsDoc(
+    name = SERVER_SCOPE,
+    category = CATEGORY_SERVER,
+    help = "Bookie request stats"
+)
+@Getter
+public class RequestStats {
+
+    final AtomicInteger addsInProgress = new AtomicInteger(0);
+    final AtomicInteger maxAddsInProgress = new AtomicInteger(0);
+    final AtomicInteger addsBlocked = new AtomicInteger(0);
+    final AtomicInteger readsInProgress = new AtomicInteger(0);
+    final AtomicInteger readsBlocked = new AtomicInteger(0);
+    final AtomicInteger maxReadsInProgress = new AtomicInteger(0);
+
+    @StatsDoc(
+        name = ADD_ENTRY_REQUEST,
+        help = "request stats of AddEntry on a bookie"
+    )
+    private final OpStatsLogger addRequestStats;
+    @StatsDoc(
+        name = ADD_ENTRY,
+        help = "operation stats of AddEntry on a bookie",
+        parent = ADD_ENTRY_REQUEST
+    )
+    private final OpStatsLogger addEntryStats;
+    @StatsDoc(
+        name = READ_ENTRY_REQUEST,
+        help = "request stats of ReadEntry on a bookie"
+    )
+    final OpStatsLogger readRequestStats;
+    @StatsDoc(
+        name = READ_ENTRY,
+        help = "operation stats of ReadEntry on a bookie",
+        parent = READ_ENTRY_REQUEST
+    )
+    final OpStatsLogger readEntryStats;
+    @StatsDoc(
+        name = FORCE_LEDGER,
+        help = "operation stats of ForceLedger on a bookie",
+        parent = FORCE_LEDGER_REQUEST
+    )
+    final OpStatsLogger forceLedgerStats;
+    @StatsDoc(
+        name = FORCE_LEDGER_REQUEST,
+        help = "request stats of ForceLedger on a bookie"
+    )
+    final OpStatsLogger forceLedgerRequestStats;
+    @StatsDoc(
+        name = READ_ENTRY_FENCE_REQUEST,
+        help = "request stats of FenceRead on a bookie"
+    )
+    final OpStatsLogger fenceReadRequestStats;
+    @StatsDoc(
+        name = READ_ENTRY_FENCE_READ,
+        help = "operation stats of FenceRead on a bookie",
+        parent = READ_ENTRY_FENCE_REQUEST,
+        happensAfter = READ_ENTRY_FENCE_WAIT
+    )
+    final OpStatsLogger fenceReadEntryStats;
+    @StatsDoc(
+        name = READ_ENTRY_FENCE_WAIT,
+        help = "operation stats of FenceReadWait on a bookie",
+        parent = READ_ENTRY_FENCE_REQUEST
+    )
+    final OpStatsLogger fenceReadWaitStats;
+    @StatsDoc(
+        name = READ_ENTRY_SCHEDULING_DELAY,
+        help = "operation stats of ReadEntry scheduling delays on a bookie"
+    )
+    final OpStatsLogger readEntrySchedulingDelayStats;
+    @StatsDoc(
+        name = READ_ENTRY_LONG_POLL_PRE_WAIT,
+        help = "operation stats of LongPoll Reads pre wait time on a bookie",
+        parent = READ_ENTRY_LONG_POLL_REQUEST
+    )
+    final OpStatsLogger longPollPreWaitStats;
+    @StatsDoc(
+        name = READ_ENTRY_LONG_POLL_WAIT,
+        help = "operation stats of LongPoll Reads wait time on a bookie",
+        happensAfter = READ_ENTRY_LONG_POLL_PRE_WAIT,
+        parent = READ_ENTRY_LONG_POLL_REQUEST
+    )
+    final OpStatsLogger longPollWaitStats;
+    @StatsDoc(
+        name = READ_ENTRY_LONG_POLL_READ,
+        help = "operation stats of LongPoll Reads on a bookie",
+        happensAfter = READ_ENTRY_LONG_POLL_WAIT,
+        parent = READ_ENTRY_LONG_POLL_REQUEST
+    )
+    final OpStatsLogger longPollReadStats;
+    @StatsDoc(
+        name = READ_ENTRY_LONG_POLL_REQUEST,
+        help = "request stats of LongPoll Reads on a bookie"
+    )
+    final OpStatsLogger longPollReadRequestStats;
+    @StatsDoc(
+        name = READ_LAST_ENTRY_NOENTRY_ERROR,
+        help = "total NOENTRY errors of reading last entry on a bookie"
+    )
+    final Counter readLastEntryNoEntryErrorCounter;
+    @StatsDoc(
+        name = WRITE_LAC_REQUEST,
+        help = "request stats of WriteLac on a bookie"
+    )
+    final OpStatsLogger writeLacRequestStats;
+    @StatsDoc(
+        name = WRITE_LAC,
+        help = "operation stats of WriteLac on a bookie",
+        parent = WRITE_LAC_REQUEST
+    )
+    final OpStatsLogger writeLacStats;
+    @StatsDoc(
+        name = READ_LAC_REQUEST,
+        help = "request stats of ReadLac on a bookie"
+    )
+    final OpStatsLogger readLacRequestStats;
+    @StatsDoc(
+        name = READ_LAC,
+        help = "operation stats of ReadLac on a bookie",
+        parent = READ_LAC_REQUEST
+    )
+    final OpStatsLogger readLacStats;
+    @StatsDoc(
+        name = GET_BOOKIE_INFO_REQUEST,
+        help = "request stats of GetBookieInfo on a bookie"
+    )
+    final OpStatsLogger getBookieInfoRequestStats;
+    @StatsDoc(
+        name = GET_BOOKIE_INFO,
+        help = "operation stats of GetBookieInfo on a bookie"
+    )
+    final OpStatsLogger getBookieInfoStats;
+    @StatsDoc(
+        name = CHANNEL_WRITE,
+        help = "channel write stats on a bookie"
+    )
+    final OpStatsLogger channelWriteStats;
+    @StatsDoc(
+        name = ADD_ENTRY_BLOCKED,
+        help = "operation stats of AddEntry blocked on a bookie"
+    )
+    final OpStatsLogger addEntryBlockedStats;
+    @StatsDoc(
+        name = READ_ENTRY_BLOCKED,
+        help = "operation stats of ReadEntry blocked on a bookie"
+    )
+    final OpStatsLogger readEntryBlockedStats;
+
+    public RequestStats(StatsLogger statsLogger) {
+        this.addEntryStats = statsLogger.getOpStatsLogger(ADD_ENTRY);
+        this.addRequestStats = statsLogger.getOpStatsLogger(ADD_ENTRY_REQUEST);
+        this.readEntryStats = statsLogger.getOpStatsLogger(READ_ENTRY);
+        this.forceLedgerStats = statsLogger.getOpStatsLogger(FORCE_LEDGER);
+        this.forceLedgerRequestStats = statsLogger.getOpStatsLogger(FORCE_LEDGER_REQUEST);
+        this.readRequestStats = statsLogger.getOpStatsLogger(READ_ENTRY_REQUEST);
+        this.fenceReadEntryStats = statsLogger.getOpStatsLogger(READ_ENTRY_FENCE_READ);
+        this.fenceReadRequestStats = statsLogger.getOpStatsLogger(READ_ENTRY_FENCE_REQUEST);
+        this.fenceReadWaitStats = statsLogger.getOpStatsLogger(READ_ENTRY_FENCE_WAIT);
+        this.readEntrySchedulingDelayStats = statsLogger.getOpStatsLogger(READ_ENTRY_SCHEDULING_DELAY);
+        this.longPollPreWaitStats = statsLogger.getOpStatsLogger(READ_ENTRY_LONG_POLL_PRE_WAIT);
+        this.longPollWaitStats = statsLogger.getOpStatsLogger(READ_ENTRY_LONG_POLL_WAIT);
+        this.longPollReadStats = statsLogger.getOpStatsLogger(READ_ENTRY_LONG_POLL_READ);
+        this.longPollReadRequestStats = statsLogger.getOpStatsLogger(READ_ENTRY_LONG_POLL_REQUEST);
+        this.readLastEntryNoEntryErrorCounter = statsLogger.getCounter(READ_LAST_ENTRY_NOENTRY_ERROR);
+        this.writeLacStats = statsLogger.getOpStatsLogger(WRITE_LAC);
+        this.writeLacRequestStats = statsLogger.getOpStatsLogger(WRITE_LAC_REQUEST);
+        this.readLacStats = statsLogger.getOpStatsLogger(READ_LAC);
+        this.readLacRequestStats = statsLogger.getOpStatsLogger(READ_LAC_REQUEST);
+        this.getBookieInfoStats = statsLogger.getOpStatsLogger(GET_BOOKIE_INFO);
+        this.getBookieInfoRequestStats = statsLogger.getOpStatsLogger(GET_BOOKIE_INFO_REQUEST);
+        this.channelWriteStats = statsLogger.getOpStatsLogger(CHANNEL_WRITE);
+
+        this.addEntryBlockedStats = statsLogger.getOpStatsLogger(ADD_ENTRY_BLOCKED_WAIT);
+        this.readEntryBlockedStats = statsLogger.getOpStatsLogger(READ_ENTRY_BLOCKED_WAIT);
+
+        statsLogger.registerGauge(ADD_ENTRY_IN_PROGRESS, new Gauge<Number>() {
+            @Override
+            public Number getDefaultValue() {
+                return 0;
+            }
+
+            @Override
+            public Number getSample() {
+                return addsInProgress;
+            }
+        });
+
+        statsLogger.registerGauge(ADD_ENTRY_BLOCKED, new Gauge<Number>() {
+            @Override
+            public Number getDefaultValue() {
+                return 0;
+            }
+
+            @Override
+            public Number getSample() {
+                return addsBlocked;
+            }
+        });
+
+        statsLogger.registerGauge(READ_ENTRY_IN_PROGRESS, new Gauge<Number>() {
+            @Override
+            public Number getDefaultValue() {
+                return 0;
+            }
+
+            @Override
+            public Number getSample() {
+                return readsInProgress;
+            }
+        });
+
+        statsLogger.registerGauge(READ_ENTRY_BLOCKED, new Gauge<Number>() {
+            @Override
+            public Number getDefaultValue() {
+                return 0;
+            }
+
+            @Override
+            public Number getSample() {
+                return readsBlocked;
+            }
+        });
+    }
+
+    //
+    // Add requests
+    //
+
+    void blockAddRequest() {
+        addsBlocked.incrementAndGet();
+    }
+
+    void unblockAddRequest(long delayNanos) {
+        addEntryBlockedStats.registerSuccessfulEvent(delayNanos, TimeUnit.NANOSECONDS);
+        addsBlocked.decrementAndGet();
+    }
+
+    void trackAddRequest() {
+        final int curr = addsInProgress.incrementAndGet();
+        maxAddsInProgress.accumulateAndGet(curr, Integer::max);
+    }
+
+    void untrackAddRequest() {
+        addsInProgress.decrementAndGet();
+    }
+
+    int maxAddsInProgressCount() {
+        return maxAddsInProgress.get();
+    }
+
+    //
+    // Read requests
+    //
+
+    void blockReadRequest() {
+        readsBlocked.incrementAndGet();
+    }
+
+    void unblockReadRequest(long delayNanos) {
+        readEntryBlockedStats.registerSuccessfulEvent(delayNanos, TimeUnit.NANOSECONDS);
+        readsBlocked.decrementAndGet();
+    }
+
+    void trackReadRequest() {
+        final int curr = readsInProgress.incrementAndGet();
+        maxReadsInProgress.accumulateAndGet(curr, Integer::max);
+    }
+
+    void untrackReadRequest() {
+        readsInProgress.decrementAndGet();
+    }
+
+    int maxReadsInProgressCount() {
+        return maxReadsInProgress.get();
+    }
+
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
index f5af75a..70db7ce 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
@@ -63,7 +63,7 @@ class WriteEntryProcessor extends PacketProcessorBase<ParsedAddRequest> implemen
                     + " so rejecting the request from the client!");
             sendResponse(BookieProtocol.EREADONLY,
                          ResponseBuilder.buildErrorResponse(BookieProtocol.EREADONLY, request),
-                         requestProcessor.getAddRequestStats());
+                         requestProcessor.getRequestStats().getAddRequestStats());
             request.release();
             request.recycle();
             return;
@@ -104,11 +104,11 @@ class WriteEntryProcessor extends PacketProcessorBase<ParsedAddRequest> implemen
         }
 
         if (rc != BookieProtocol.EOK) {
-            requestProcessor.getAddEntryStats().registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos),
-                    TimeUnit.NANOSECONDS);
+            requestProcessor.getRequestStats().getAddEntryStats()
+                .registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
             sendResponse(rc,
                          ResponseBuilder.buildErrorResponse(rc, request),
-                         requestProcessor.getAddRequestStats());
+                         requestProcessor.getRequestStats().getAddRequestStats());
             request.recycle();
         }
     }
@@ -117,15 +117,15 @@ class WriteEntryProcessor extends PacketProcessorBase<ParsedAddRequest> implemen
     public void writeComplete(int rc, long ledgerId, long entryId,
                               BookieSocketAddress addr, Object ctx) {
         if (BookieProtocol.EOK == rc) {
-            requestProcessor.getAddEntryStats().registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos),
-                    TimeUnit.NANOSECONDS);
+            requestProcessor.getRequestStats().getAddEntryStats()
+                .registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
         } else {
-            requestProcessor.getAddEntryStats().registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos),
-                    TimeUnit.NANOSECONDS);
+            requestProcessor.getRequestStats().getAddEntryStats()
+                .registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
         }
         sendResponse(rc,
                      ResponseBuilder.buildAddResponse(request),
-                     requestProcessor.getAddRequestStats());
+                     requestProcessor.getRequestStats().getAddRequestStats());
         request.recycle();
         recycle();
     }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
index 7747e5c..c8ea067 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
@@ -80,11 +80,11 @@ class WriteEntryProcessorV3 extends PacketProcessorBaseV3 {
             public void writeComplete(int rc, long ledgerId, long entryId,
                                       BookieSocketAddress addr, Object ctx) {
                 if (BookieProtocol.EOK == rc) {
-                    requestProcessor.getAddEntryStats().registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos),
-                            TimeUnit.NANOSECONDS);
+                    requestProcessor.getRequestStats().getAddEntryStats()
+                        .registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
                 } else {
-                    requestProcessor.getAddEntryStats().registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos),
-                            TimeUnit.NANOSECONDS);
+                    requestProcessor.getRequestStats().getAddEntryStats()
+                        .registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
                 }
 
                 StatusCode status;
@@ -105,7 +105,7 @@ class WriteEntryProcessorV3 extends PacketProcessorBaseV3 {
                         .setStatus(addResponse.getStatus())
                         .setAddResponse(addResponse);
                 Response resp = response.build();
-                sendResponse(status, resp, requestProcessor.getAddRequestStats());
+                sendResponse(status, resp, requestProcessor.getRequestStats().getAddRequestStats());
             }
         };
         final EnumSet<WriteFlag> writeFlags;
@@ -171,7 +171,7 @@ class WriteEntryProcessorV3 extends PacketProcessorBaseV3 {
                     .setAddResponse(addResponse);
             Response resp = response.build();
             sendResponse(addResponse.getStatus(), resp,
-                         requestProcessor.getAddRequestStats());
+                         requestProcessor.getRequestStats().getAddRequestStats());
         }
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java
index 7e42a73..691102b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java
@@ -71,11 +71,11 @@ class WriteLacProcessorV3 extends PacketProcessorBaseV3 implements Runnable {
             @Override
             public void writeComplete(int rc, long ledgerId, long entryId, BookieSocketAddress addr, Object ctx) {
                 if (BookieProtocol.EOK == rc) {
-                    requestProcessor.writeLacStats.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos),
-                            TimeUnit.NANOSECONDS);
+                    requestProcessor.getRequestStats().getWriteLacStats()
+                        .registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
                 } else {
-                    requestProcessor.writeLacStats.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos),
-                            TimeUnit.NANOSECONDS);
+                    requestProcessor.getRequestStats().getWriteLacStats()
+                        .registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
                 }
 
                 StatusCode status;
@@ -96,7 +96,7 @@ class WriteLacProcessorV3 extends PacketProcessorBaseV3 implements Runnable {
                         .setStatus(writeLacResponse.getStatus())
                         .setWriteLacResponse(writeLacResponse);
                 Response resp = response.build();
-                sendResponse(status, resp, requestProcessor.writeLacRequestStats);
+                sendResponse(status, resp, requestProcessor.getRequestStats().getWriteLacRequestStats());
             }
         };
 
@@ -130,8 +130,8 @@ class WriteLacProcessorV3 extends PacketProcessorBaseV3 implements Runnable {
         // If everything is okay, we return null so that the calling function
         // dosn't return a response back to the caller.
         if (!status.equals(StatusCode.EOK)) {
-            requestProcessor.writeLacStats.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos),
-                    TimeUnit.NANOSECONDS);
+            requestProcessor.getRequestStats().getWriteLacStats()
+                .registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
             writeLacResponse.setStatus(status);
             return writeLacResponse.build();
         }
@@ -147,7 +147,10 @@ class WriteLacProcessorV3 extends PacketProcessorBaseV3 implements Runnable {
                     .setStatus(writeLacResponse.getStatus())
                     .setWriteLacResponse(writeLacResponse);
             Response resp = response.build();
-            sendResponse(writeLacResponse.getStatus(), resp, requestProcessor.writeLacRequestStats);
+            sendResponse(
+                writeLacResponse.getStatus(),
+                resp,
+                requestProcessor.getRequestStats().getWriteLacRequestStats());
         }
     }
 
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java
index 37d4647..bab83fb 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java
@@ -75,10 +75,7 @@ public class ForceLedgerProcessorV3Test {
         requestProcessor = mock(BookieRequestProcessor.class);
         when(requestProcessor.getBookie()).thenReturn(bookie);
         when(requestProcessor.getWaitTimeoutOnBackpressureMillis()).thenReturn(-1L);
-        when(requestProcessor.getForceLedgerStats())
-            .thenReturn(NullStatsLogger.INSTANCE.getOpStatsLogger("force_ledger"));
-        when(requestProcessor.getForceLedgerRequestStats())
-            .thenReturn(NullStatsLogger.INSTANCE.getOpStatsLogger("force_ledger_request"));
+        when(requestProcessor.getRequestStats()).thenReturn(new RequestStats(NullStatsLogger.INSTANCE));
         processor = new ForceLedgerProcessorV3(
             request,
             channel,
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java
index 5901c2f..bbcffea 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java
@@ -67,10 +67,7 @@ public class WriteEntryProcessorTest {
         bookie = mock(Bookie.class);
         requestProcessor = mock(BookieRequestProcessor.class);
         when(requestProcessor.getBookie()).thenReturn(bookie);
-        when(requestProcessor.getAddEntryStats())
-            .thenReturn(NullStatsLogger.INSTANCE.getOpStatsLogger("add_entry"));
-        when(requestProcessor.getAddRequestStats())
-            .thenReturn(NullStatsLogger.INSTANCE.getOpStatsLogger("add_requests"));
+        when(requestProcessor.getRequestStats()).thenReturn(new RequestStats(NullStatsLogger.INSTANCE));
         processor = WriteEntryProcessor.create(
             request,
             channel,
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java
index df7b153..292dc51 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java
@@ -80,12 +80,7 @@ public class WriteEntryProcessorV3Test {
         requestProcessor = mock(BookieRequestProcessor.class);
         when(requestProcessor.getBookie()).thenReturn(bookie);
         when(requestProcessor.getWaitTimeoutOnBackpressureMillis()).thenReturn(-1L);
-        when(requestProcessor.getAddEntryStats())
-            .thenReturn(NullStatsLogger.INSTANCE.getOpStatsLogger("add_entry"));
-        when(requestProcessor.getAddRequestStats())
-            .thenReturn(NullStatsLogger.INSTANCE.getOpStatsLogger("add_requests"));
-        when(requestProcessor.getChannelWriteStats())
-                .thenReturn(NullStatsLogger.INSTANCE.getOpStatsLogger("CHANNEL_WRITE"));
+        when(requestProcessor.getRequestStats()).thenReturn(new RequestStats(NullStatsLogger.INSTANCE));
         processor = new WriteEntryProcessorV3(
             request,
             channel,
diff --git a/bookkeeper-stats/src/main/java/org/apache/bookkeeper/stats/annotations/StatsDoc.java b/bookkeeper-stats/src/main/java/org/apache/bookkeeper/stats/annotations/StatsDoc.java
index 97f487a..d2ca8c6 100644
--- a/bookkeeper-stats/src/main/java/org/apache/bookkeeper/stats/annotations/StatsDoc.java
+++ b/bookkeeper-stats/src/main/java/org/apache/bookkeeper/stats/annotations/StatsDoc.java
@@ -58,5 +58,25 @@ public @interface StatsDoc {
      */
     String help();
 
+    /**
+     * The parent metric name.
+     *
+     * <p>It can used for analyzing the relationships
+     * between the metrics, especially for the latency metrics.
+     *
+     * @return the parent metric name
+     */
+    String parent() default "";
+
+    /**
+     * The metric name of an operation that happens
+     * after the operation of this metric.
+     *
+     * <p>similar as {@link #parent()}, it can be used for analyzing
+     * the dependencies between metrics.
+     *
+     * @return the metric name of an operation that happens after the operation of this metric.
+     */
+    String happensAfter() default "";
 
 }