You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by ch...@apache.org on 2023/03/06 01:03:04 UTC

[bookkeeper] branch master updated: Simplified the logic for ForceWriteThread after we introduced queue.drainTo() (#3830)

This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 128c52eeff Simplified the logic for ForceWriteThread after we introduced queue.drainTo() (#3830)
128c52eeff is described below

commit 128c52eeffeb0d009aedc3b92e8bd30852332a09
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Sun Mar 5 17:02:57 2023 -0800

    Simplified the logic for ForceWriteThread after we introduced queue.drainTo() (#3830)
    
    ### Motivation
    
    In #3545 we have switched the `ForceWriteThread` to take advantage o `BlockingQueue.drainTo()` method for reducing contention, though the core logic of the force-write was not touched at the time.
    
    The logic of force-write is quite complicated because it tries to group multiple force-write requests in the queue by sending a new marker and grouping them when the marker is received. This also leads to a bit of lag when there are many requests coming in and the IO is stressed, as we're waiting a bit more before issuing the fsync.
    
    Instead, with the `drainTo()` approach we can greatly simplify the logic and maintain a strict fsync grouping:
     1. drain all the force-write-requests available in the queue into a local array list
     2. perform the fsync
     3. update the journal log mark to the position of the last fw request
     4. trigger send-responses for all the requests
     5. go back to read from the queue
    
    
    This refactoring will also enable further improvements, to optimize how the send responses are prepared, since we have now a list of responses ready to send.
---
 .../bookkeeper/bookie/BookKeeperServerStats.java   |   4 -
 .../java/org/apache/bookkeeper/bookie/Journal.java | 186 +++++----------------
 .../bookkeeper/bookie/stats/JournalStats.java      |  30 ----
 .../org/apache/bookkeeper/test/OpStatTest.java     |   2 -
 4 files changed, 45 insertions(+), 177 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
index 888577d458..d4657d2036 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
@@ -109,7 +109,6 @@ public interface BookKeeperServerStats {
     String JOURNAL_FORCE_WRITE_ENQUEUE = "JOURNAL_FORCE_WRITE_ENQUEUE";
     String JOURNAL_FORCE_WRITE_BATCH_ENTRIES = "JOURNAL_FORCE_WRITE_BATCH_ENTRIES";
     String JOURNAL_FORCE_WRITE_BATCH_BYTES = "JOURNAL_FORCE_WRITE_BATCH_BYTES";
-    String JOURNAL_FORCE_WRITE_GROUPING_FAILURES = "JOURNAL_FORCE_WRITE_GROUPING_FAILURES";
     String JOURNAL_FLUSH_LATENCY = "JOURNAL_FLUSH_LATENCY";
     String JOURNAL_QUEUE_LATENCY = "JOURNAL_QUEUE_LATENCY";
     String JOURNAL_QUEUE_MAX_SIZE = "JOURNAL_QUEUE_MAX_SIZE";
@@ -158,9 +157,6 @@ public interface BookKeeperServerStats {
     String INDEX_INMEM_ILLEGAL_STATE_RESET = "INDEX_INMEM_ILLEGAL_STATE_RESET";
     String INDEX_INMEM_ILLEGAL_STATE_DELETE = "INDEX_INMEM_ILLEGAL_STATE_DELETE";
     String JOURNAL_FORCE_WRITE_QUEUE_SIZE = "JOURNAL_FORCE_WRITE_QUEUE_SIZE";
-    String JOURNAL_CB_QUEUE_SIZE = "JOURNAL_CB_QUEUE_SIZE";
-    String CB_THREAD_POOL_QUEUE_SIZE = "CB_THREAD_POOL_QUEUE_SIZE";
-    String JOURNAL_CB_QUEUED_LATENCY = "JOURNAL_CB_QUEUED_LATENCY";
     String JOURNAL_NUM_FLUSH_EMPTY_QUEUE = "JOURNAL_NUM_FLUSH_EMPTY_QUEUE";
     String JOURNAL_NUM_FLUSH_MAX_OUTSTANDING_BYTES = "JOURNAL_NUM_FLUSH_MAX_OUTSTANDING_BYTES";
     String JOURNAL_NUM_FLUSH_MAX_WAIT = "JOURNAL_NUM_FLUSH_MAX_WAIT";
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index a71d4fb2c8..d14d5cb8a5 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -299,19 +299,14 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
         WriteCallback cb;
         Object ctx;
         long enqueueTime;
-        long enqueueCbThreadPooleQueueTime;
         boolean ackBeforeSync;
 
         OpStatsLogger journalAddEntryStats;
-        OpStatsLogger journalCbQueuedLatency;
-        Counter journalCbQueueSize;
-        Counter cbThreadPoolQueueSize;
         Counter callbackTime;
 
         static QueueEntry create(ByteBuf entry, boolean ackBeforeSync, long ledgerId, long entryId,
                 WriteCallback cb, Object ctx, long enqueueTime, OpStatsLogger journalAddEntryStats,
-                Counter journalCbQueueSize, Counter cbThreadPoolQueueSize,
-                OpStatsLogger journalCbQueuedLatency, Counter callbackTime) {
+                Counter callbackTime) {
             QueueEntry qe = RECYCLER.get();
             qe.entry = entry;
             qe.ackBeforeSync = ackBeforeSync;
@@ -321,27 +316,16 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
             qe.entryId = entryId;
             qe.enqueueTime = enqueueTime;
             qe.journalAddEntryStats = journalAddEntryStats;
-            qe.journalCbQueuedLatency = journalCbQueuedLatency;
-            qe.journalCbQueueSize = journalCbQueueSize;
-            qe.cbThreadPoolQueueSize = cbThreadPoolQueueSize;
             qe.callbackTime = callbackTime;
             return qe;
         }
 
-        public void setEnqueueCbThreadPooleQueueTime(long enqueueCbThreadPooleQueueTime) {
-            this.enqueueCbThreadPooleQueueTime = enqueueCbThreadPooleQueueTime;
-        }
-
         @Override
         public void run() {
-            journalCbQueuedLatency.registerSuccessfulEvent(
-                    MathUtils.elapsedNanos(enqueueCbThreadPooleQueueTime), TimeUnit.NANOSECONDS);
             long startTime = System.nanoTime();
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Acknowledge Ledger: {}, Entry: {}", ledgerId, entryId);
             }
-            journalCbQueueSize.dec();
-            cbThreadPoolQueueSize.dec();
             journalAddEntryStats.registerSuccessfulEvent(MathUtils.elapsedNanos(enqueueTime), TimeUnit.NANOSECONDS);
             cb.writeComplete(0, ledgerId, entryId, null, ctx);
             callbackTime.addLatency(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
@@ -366,9 +350,6 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
             this.cb = null;
             this.ctx = null;
             this.journalAddEntryStats = null;
-            this.journalCbQueuedLatency = null;
-            this.journalCbQueueSize = null;
-            this.cbThreadPoolQueueSize = null;
             this.callbackTime = null;
             recyclerHandle.recycle(this);
         }
@@ -378,51 +359,25 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
      * Token which represents the need to force a write to the Journal.
      */
     @VisibleForTesting
-    public class ForceWriteRequest {
+    public static class ForceWriteRequest {
         private JournalChannel logFile;
         private RecyclableArrayList<QueueEntry> forceWriteWaiters;
         private boolean shouldClose;
-        private boolean isMarker;
         private long lastFlushedPosition;
         private long logId;
-        private long enqueueTime;
 
-        public int process(boolean shouldForceWrite) throws IOException {
-            journalStats.getForceWriteQueueSize().dec();
-            journalStats.getFwEnqueueTimeStats()
-                .registerSuccessfulEvent(MathUtils.elapsedNanos(enqueueTime), TimeUnit.NANOSECONDS);
-
-            if (isMarker) {
-                return 0;
-            }
+        public int process() {
+            closeFileIfNecessary();
 
-            long startTime = MathUtils.nowInNano();
-            try {
-                if (shouldForceWrite) {
-                    this.logFile.forceWrite(false);
-                    journalStats.getJournalSyncStats()
-                        .registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
-                }
-                lastLogMark.setCurLogMark(this.logId, this.lastFlushedPosition);
-
-                // Notify the waiters that the force write succeeded
-                for (int i = 0; i < forceWriteWaiters.size(); i++) {
-                    QueueEntry qe = forceWriteWaiters.get(i);
-                    if (qe != null) {
-                        qe.setEnqueueCbThreadPooleQueueTime(MathUtils.nowInNano());
-                        journalStats.getCbThreadPoolQueueSize().inc();
-                        qe.run();
-                    }
+            // Notify the waiters that the force write succeeded
+            for (int i = 0; i < forceWriteWaiters.size(); i++) {
+                QueueEntry qe = forceWriteWaiters.get(i);
+                if (qe != null) {
+                    qe.run();
                 }
-
-                return forceWriteWaiters.size();
-            } catch (IOException e) {
-                journalStats.getJournalSyncStats()
-                        .registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
-                throw e;
-            } finally {
-                closeFileIfNecessary();
             }
+
+            return forceWriteWaiters.size();
         }
 
         public void closeFileIfNecessary() {
@@ -431,6 +386,7 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
                 // We should guard against exceptions so its
                 // safe to call in catch blocks
                 try {
+                    logFile.forceWrite(false);
                     logFile.close();
                     // Call close only once
                     shouldClose = false;
@@ -460,21 +416,18 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
                           long logId,
                           long lastFlushedPosition,
                           RecyclableArrayList<QueueEntry> forceWriteWaiters,
-                          boolean shouldClose,
-                          boolean isMarker) {
+                          boolean shouldClose) {
         ForceWriteRequest req = forceWriteRequestsRecycler.get();
         req.forceWriteWaiters = forceWriteWaiters;
         req.logFile = logFile;
         req.logId = logId;
         req.lastFlushedPosition = lastFlushedPosition;
         req.shouldClose = shouldClose;
-        req.isMarker = isMarker;
-        req.enqueueTime = MathUtils.nowInNano();
         journalStats.getForceWriteQueueSize().inc();
         return req;
     }
 
-    private final Recycler<ForceWriteRequest> forceWriteRequestsRecycler = new Recycler<ForceWriteRequest>() {
+    private static final Recycler<ForceWriteRequest> forceWriteRequestsRecycler = new Recycler<ForceWriteRequest>() {
                 @Override
                 protected ForceWriteRequest newObject(
                         Recycler.Handle<ForceWriteRequest> handle) {
@@ -495,10 +448,6 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
         private final boolean enableGroupForceWrites;
         private final Counter forceWriteThreadTime;
 
-        boolean shouldForceWrite = true;
-        int numReqInLastForceWrite = 0;
-        boolean forceWriteMarkerSent = false;
-
         public ForceWriteThread(Thread threadToNotifyOnEx,
                                 boolean enableGroupForceWrites,
                                 StatsLogger statsLogger) {
@@ -520,12 +469,11 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
                 }
             }
 
-            long busyStartTime = System.nanoTime();
-
-            List<ForceWriteRequest> localRequests = new ArrayList<>();
+            final List<ForceWriteRequest> localRequests = new ArrayList<>();
 
             while (running) {
                 try {
+                    int numReqInLastForceWrite = 0;
 
                     int requestsCount = forceWriteRequests.drainTo(localRequests);
                     if (requestsCount == 0) {
@@ -534,11 +482,26 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
                         requestsCount = 1;
                     }
 
+                    journalStats.getForceWriteQueueSize().addCount(-requestsCount);
+
+                    // Sync and mark the journal up to the position of the last entry in the batch
+                    ForceWriteRequest lastRequest = localRequests.get(requestsCount - 1);
+                    syncJournal(lastRequest);
+
+                    // All the requests in the batch are now fully-synced. We can trigger sending the
+                    // responses
                     for (int i = 0; i < requestsCount; i++) {
-                        forceWriteThreadTime.addLatency(MathUtils.elapsedNanos(busyStartTime), TimeUnit.NANOSECONDS);
-                        processForceWriteRequest(localRequests.get(i));
-                        busyStartTime = System.nanoTime();
+                        ForceWriteRequest req = localRequests.get(i);
+                        numReqInLastForceWrite += req.process();
+                        req.recycle();
                     }
+
+                    journalStats.getForceWriteGroupingCountStats()
+                            .registerSuccessfulValue(numReqInLastForceWrite);
+
+                } catch (IOException ioe) {
+                    LOG.error("I/O exception in ForceWrite thread", ioe);
+                    running = false;
                 } catch (InterruptedException e) {
                     Thread.currentThread().interrupt();
                     LOG.info("ForceWrite thread interrupted");
@@ -559,66 +522,17 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
             threadToNotifyOnEx.interrupt();
         }
 
-        private void processForceWriteRequest(ForceWriteRequest req) {
+        private void syncJournal(ForceWriteRequest lastRequest) throws IOException {
+            long fsyncStartTime = MathUtils.nowInNano();
             try {
-                // Force write the file and then notify the write completions
-                //
-                if (!req.isMarker) {
-                    if (shouldForceWrite) {
-                        // if we are going to force write, any request that is already in the
-                        // queue will benefit from this force write - post a marker prior to issuing
-                        // the flush so until this marker is encountered we can skip the force write
-                        if (enableGroupForceWrites) {
-                            ForceWriteRequest marker =
-                                    createForceWriteRequest(req.logFile, 0, 0, null, false, true);
-                            forceWriteMarkerSent = forceWriteRequests.offer(marker);
-                            if (!forceWriteMarkerSent) {
-                                marker.recycle();
-                                Counter failures = journalStats.getForceWriteGroupingFailures();
-                                failures.inc();
-                                LOG.error(
-                                        "Fail to send force write grouping marker,"
-                                                + " Journal.forceWriteRequests queue(capacity {}) is full,"
-                                                + " current failure counter is {}.",
-                                        conf.getJournalQueueSize(), failures.get());
-                            }
-                        }
-
-                        // If we are about to issue a write, record the number of requests in
-                        // the last force write and then reset the counter so we can accumulate
-                        // requests in the write we are about to issue
-                        if (numReqInLastForceWrite > 0) {
-                            journalStats.getForceWriteGroupingCountStats()
-                                    .registerSuccessfulValue(numReqInLastForceWrite);
-                            numReqInLastForceWrite = 0;
-                        }
-                    }
-                }
-                numReqInLastForceWrite += req.process(shouldForceWrite);
-
-                if (enableGroupForceWrites
-                        // if its a marker we should switch back to flushing
-                        && !req.isMarker
-                        // If group marker sending failed, we can't figure out which writes are
-                        // grouped in this force write. So, abandon it even if other writes could
-                        // be grouped. This should be extremely rare as, usually, queue size is
-                        // large enough to accommodate high flush frequencies.
-                        && forceWriteMarkerSent
-                        // This indicates that this is the last request in a given file
-                        // so subsequent requests will go to a different file so we should
-                        // flush on the next request
-                        && !req.shouldClose) {
-                    shouldForceWrite = false;
-                } else {
-                    shouldForceWrite = true;
-                }
-             } catch (IOException ioe) {
-                LOG.error("I/O exception in ForceWrite thread", ioe);
-                running = false;
-            } finally {
-                if (req != null) {
-                    req.recycle();
-                }
+                lastRequest.logFile.forceWrite(false);
+                journalStats.getJournalSyncStats().registerSuccessfulEvent(MathUtils.elapsedNanos(fsyncStartTime),
+                        TimeUnit.NANOSECONDS);
+                lastLogMark.setCurLogMark(lastRequest.logId, lastRequest.lastFlushedPosition);
+            } catch (IOException ioe) {
+                journalStats.getJournalSyncStats()
+                        .registerFailedEvent(MathUtils.elapsedNanos(fsyncStartTime), TimeUnit.NANOSECONDS);
+                throw ioe;
             }
         }
 
@@ -972,16 +886,12 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
         entry.retain();
 
         journalStats.getJournalQueueSize().inc();
-        journalStats.getJournalCbQueueSize().inc();
 
         memoryLimitController.reserveMemory(entry.readableBytes());
 
         queue.put(QueueEntry.create(
                 entry, ackBeforeSync,  ledgerId, entryId, cb, ctx, MathUtils.nowInNano(),
                 journalStats.getJournalAddEntryStats(),
-                journalStats.getJournalCbQueueSize(),
-                journalStats.getCbThreadPoolQueueSize(),
-                journalStats.getJournalCbQueuedLatency(),
                 callbackTime));
     }
 
@@ -990,13 +900,9 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
                 null, false /* ackBeforeSync */, ledgerId,
                 BookieImpl.METAENTRY_ID_FORCE_LEDGER, cb, ctx, MathUtils.nowInNano(),
                 journalStats.getJournalForceLedgerStats(),
-                journalStats.getJournalCbQueueSize(),
-                journalStats.getCbThreadPoolQueueSize(),
-                journalStats.getJournalCbQueuedLatency(),
                 callbackTime));
         // Increment afterwards because the add operation could fail.
         journalStats.getJournalQueueSize().inc();
-        journalStats.getJournalCbQueueSize().inc();
     }
 
     /**
@@ -1185,8 +1091,6 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
                                 if (entry != null && (!syncData || entry.ackBeforeSync)) {
                                     toFlush.set(i, null);
                                     numEntriesToFlush--;
-                                    entry.setEnqueueCbThreadPooleQueueTime(MathUtils.nowInNano());
-                                    journalStats.getCbThreadPoolQueueSize().inc();
                                     entry.run();
                                 }
                             }
@@ -1225,7 +1129,7 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
                                     || (System.currentTimeMillis() - lastFlushTimeMs
                                     >= journalPageCacheFlushIntervalMSec)) {
                                 forceWriteRequests.put(createForceWriteRequest(logFile, logId, lastFlushPosition,
-                                        toFlush, shouldRolloverJournal, false));
+                                        toFlush, shouldRolloverJournal));
                                 lastFlushTimeMs = System.currentTimeMillis();
                             }
                             toFlush = entryListRecycler.newInstance();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/JournalStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/JournalStats.java
index ed2aab9524..2396acb1f7 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/JournalStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/JournalStats.java
@@ -21,11 +21,8 @@ package org.apache.bookkeeper.bookie.stats;
 
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.CATEGORY_SERVER;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.CB_THREAD_POOL_QUEUE_SIZE;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.FORCE_LEDGER;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_ADD_ENTRY;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_CB_QUEUED_LATENCY;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_CB_QUEUE_SIZE;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_CREATION_LATENCY;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_FLUSH_LATENCY;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_FORCE_LEDGER;
@@ -33,7 +30,6 @@ import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_FORCE_W
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_FORCE_WRITE_BATCH_ENTRIES;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_FORCE_WRITE_ENQUEUE;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_FORCE_WRITE_GROUPING_COUNT;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_FORCE_WRITE_GROUPING_FAILURES;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_FORCE_WRITE_QUEUE_SIZE;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_MEMORY_MAX;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_MEMORY_USED;
@@ -134,11 +130,6 @@ public class JournalStats {
         help = "The distribution of number of bytes grouped together into a force write request"
     )
     private final OpStatsLogger forceWriteBatchBytesStats;
-    @StatsDoc(
-        name = JOURNAL_FORCE_WRITE_GROUPING_FAILURES,
-        help = "The number of force write grouping failures"
-    )
-    private final Counter forceWriteGroupingFailures;
     @StatsDoc(
         name = JOURNAL_QUEUE_SIZE,
         help = "The journal queue size"
@@ -149,23 +140,6 @@ public class JournalStats {
         help = "The force write queue size"
     )
     private final Counter forceWriteQueueSize;
-    @StatsDoc(
-        name = JOURNAL_CB_QUEUE_SIZE,
-        help = "The journal callback queue size"
-    )
-    private final Counter journalCbQueueSize;
-
-    @StatsDoc(
-            name = CB_THREAD_POOL_QUEUE_SIZE,
-            help = "The queue size of cbThreadPool"
-    )
-    private final Counter cbThreadPoolQueueSize;
-
-    @StatsDoc(
-            name = JOURNAL_CB_QUEUED_LATENCY,
-            help = "The journal callback queued latency"
-    )
-    private final OpStatsLogger journalCbQueuedLatency;
 
     @StatsDoc(
         name = JOURNAL_NUM_FLUSH_MAX_WAIT,
@@ -210,15 +184,11 @@ public class JournalStats {
         journalProcessTimeStats = statsLogger.getOpStatsLogger(BookKeeperServerStats.JOURNAL_PROCESS_TIME_LATENCY);
         forceWriteGroupingCountStats =
                 statsLogger.getOpStatsLogger(BookKeeperServerStats.JOURNAL_FORCE_WRITE_GROUPING_COUNT);
-        forceWriteGroupingFailures = statsLogger.getCounter(JOURNAL_FORCE_WRITE_GROUPING_FAILURES);
         forceWriteBatchEntriesStats =
                 statsLogger.getOpStatsLogger(BookKeeperServerStats.JOURNAL_FORCE_WRITE_BATCH_ENTRIES);
         forceWriteBatchBytesStats = statsLogger.getOpStatsLogger(BookKeeperServerStats.JOURNAL_FORCE_WRITE_BATCH_BYTES);
         journalQueueSize = statsLogger.getCounter(BookKeeperServerStats.JOURNAL_QUEUE_SIZE);
         forceWriteQueueSize = statsLogger.getCounter(BookKeeperServerStats.JOURNAL_FORCE_WRITE_QUEUE_SIZE);
-        journalCbQueueSize = statsLogger.getCounter(BookKeeperServerStats.JOURNAL_CB_QUEUE_SIZE);
-        cbThreadPoolQueueSize = statsLogger.getCounter(BookKeeperServerStats.CB_THREAD_POOL_QUEUE_SIZE);
-        journalCbQueuedLatency = statsLogger.getOpStatsLogger(BookKeeperServerStats.JOURNAL_CB_QUEUED_LATENCY);
         flushMaxWaitCounter = statsLogger.getCounter(BookKeeperServerStats.JOURNAL_NUM_FLUSH_MAX_WAIT);
         flushMaxOutstandingBytesCounter =
                 statsLogger.getCounter(BookKeeperServerStats.JOURNAL_NUM_FLUSH_MAX_OUTSTANDING_BYTES);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/OpStatTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/OpStatTest.java
index 4e818389bc..c5abdc521a 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/OpStatTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/OpStatTest.java
@@ -22,7 +22,6 @@
 package org.apache.bookkeeper.test;
 
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_SCOPE;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_CB_QUEUE_SIZE;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_FORCE_WRITE_QUEUE_SIZE;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_QUEUE_SIZE;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_SCOPE;
@@ -114,7 +113,6 @@ public class OpStatTest extends BookKeeperClusterTestCase {
             assertTrue(average <= elapsed);
         });
         validateNonMonotonicCounterGauges(stats, new String[]{
-                BOOKIE_SCOPE + "." + JOURNAL_SCOPE + ".journalIndex_0." + JOURNAL_CB_QUEUE_SIZE,
                 BOOKIE_SCOPE + "." + JOURNAL_SCOPE + ".journalIndex_0." + JOURNAL_FORCE_WRITE_QUEUE_SIZE,
                 BOOKIE_SCOPE + "." + JOURNAL_SCOPE + ".journalIndex_0." + JOURNAL_QUEUE_SIZE
         }, (value, max) -> {