You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by yo...@apache.org on 2022/08/02 06:26:13 UTC

[bookkeeper] 20/22: add metric cbThreadPoolQueueSize (#3424)

This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-4.15
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit 7df24b738cd1e9a938f0f25232d9f5c1a90cc8a1
Author: LinChen <15...@qq.com>
AuthorDate: Fri Jul 29 15:30:20 2022 +0800

    add metric cbThreadPoolQueueSize (#3424)
    
    ### Motivation
    Add a new metric cbThreadPoolQueueSize to count the queue size in the thread pool cbThreadPool.
    
    (cherry picked from commit 35c71ab1626459fdac111a8164e359e871b732e4)
---
 .../org/apache/bookkeeper/bookie/BookKeeperServerStats.java   |  1 +
 .../src/main/java/org/apache/bookkeeper/bookie/Journal.java   | 11 +++++++++--
 .../java/org/apache/bookkeeper/bookie/stats/JournalStats.java |  7 +++++++
 3 files changed, 17 insertions(+), 2 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
index f13747f9f6..094b58d0f0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
@@ -163,6 +163,7 @@ public interface BookKeeperServerStats {
     String INDEX_INMEM_ILLEGAL_STATE_DELETE = "INDEX_INMEM_ILLEGAL_STATE_DELETE";
     String JOURNAL_FORCE_WRITE_QUEUE_SIZE = "JOURNAL_FORCE_WRITE_QUEUE_SIZE";
     String JOURNAL_CB_QUEUE_SIZE = "JOURNAL_CB_QUEUE_SIZE";
+    String CB_THREAD_POOL_QUEUE_SIZE = "CB_THREAD_POOL_QUEUE_SIZE";
     String JOURNAL_CB_QUEUED_LATENCY = "JOURNAL_CB_QUEUED_LATENCY";
     String JOURNAL_NUM_FORCE_WRITES = "JOURNAL_NUM_FORCE_WRITES";
     String JOURNAL_NUM_FLUSH_EMPTY_QUEUE = "JOURNAL_NUM_FLUSH_EMPTY_QUEUE";
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index 9de7dcda57..46f48b1f90 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -309,12 +309,13 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
         OpStatsLogger journalAddEntryStats;
         OpStatsLogger journalCbQueuedLatency;
         Counter journalCbQueueSize;
+        Counter cbThreadPoolQueueSize;
         Counter callbackTime;
 
         static QueueEntry create(ByteBuf entry, boolean ackBeforeSync, long ledgerId, long entryId,
                 WriteCallback cb, Object ctx, long enqueueTime, OpStatsLogger journalAddEntryStats,
-                Counter journalCbQueueSize, OpStatsLogger journalCbQueuedLatency,
-                Counter callbackTime) {
+                Counter journalCbQueueSize, Counter cbThreadPoolQueueSize,
+                OpStatsLogger journalCbQueuedLatency, Counter callbackTime) {
             QueueEntry qe = RECYCLER.get();
             qe.entry = entry;
             qe.ackBeforeSync = ackBeforeSync;
@@ -326,6 +327,7 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
             qe.journalAddEntryStats = journalAddEntryStats;
             qe.journalCbQueuedLatency = journalCbQueuedLatency;
             qe.journalCbQueueSize = journalCbQueueSize;
+            qe.cbThreadPoolQueueSize = cbThreadPoolQueueSize;
             qe.callbackTime = callbackTime;
             return qe;
         }
@@ -343,6 +345,7 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
                 LOG.debug("Acknowledge Ledger: {}, Entry: {}", ledgerId, entryId);
             }
             journalCbQueueSize.dec();
+            cbThreadPoolQueueSize.dec();
             journalAddEntryStats.registerSuccessfulEvent(MathUtils.elapsedNanos(enqueueTime), TimeUnit.NANOSECONDS);
             cb.writeComplete(0, ledgerId, entryId, null, ctx);
             recycle();
@@ -403,6 +406,7 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
                     QueueEntry qe = forceWriteWaiters.get(i);
                     if (qe != null) {
                         qe.setEnqueueCbThreadPooleQueueTime(MathUtils.nowInNano());
+                        journalStats.getCbThreadPoolQueueSize().inc();
                         cbThreadPool.execute(qe);
                     }
                 }
@@ -963,6 +967,7 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
                 entry, ackBeforeSync,  ledgerId, entryId, cb, ctx, MathUtils.nowInNano(),
                 journalStats.getJournalAddEntryStats(),
                 journalStats.getJournalCbQueueSize(),
+                journalStats.getCbThreadPoolQueueSize(),
                 journalStats.getJournalCbQueuedLatency(),
                 callbackTime));
     }
@@ -973,6 +978,7 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
                 BookieImpl.METAENTRY_ID_FORCE_LEDGER, cb, ctx, MathUtils.nowInNano(),
                 journalStats.getJournalForceLedgerStats(),
                 journalStats.getJournalCbQueueSize(),
+                journalStats.getCbThreadPoolQueueSize(),
                 journalStats.getJournalCbQueuedLatency(),
                 callbackTime));
         // Increment afterwards because the add operation could fail.
@@ -1141,6 +1147,7 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
                                     toFlush.set(i, null);
                                     numEntriesToFlush--;
                                     entry.setEnqueueCbThreadPooleQueueTime(MathUtils.nowInNano());
+                                    journalStats.getCbThreadPoolQueueSize().inc();
                                     cbThreadPool.execute(entry);
                                 }
                             }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/JournalStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/JournalStats.java
index a35d74d566..b136a886e1 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/JournalStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/JournalStats.java
@@ -155,6 +155,12 @@ public class JournalStats {
     )
     private final Counter journalCbQueueSize;
 
+    @StatsDoc(
+            name = JOURNAL_CB_QUEUE_SIZE,
+            help = "The queue size of cbThreadPool"
+    )
+    private final Counter cbThreadPoolQueueSize;
+
     @StatsDoc(
             name = JOURNAL_CB_QUEUED_LATENCY,
             help = "The journal callback queued latency"
@@ -211,6 +217,7 @@ public class JournalStats {
         journalQueueSize = statsLogger.getCounter(BookKeeperServerStats.JOURNAL_QUEUE_SIZE);
         forceWriteQueueSize = statsLogger.getCounter(BookKeeperServerStats.JOURNAL_FORCE_WRITE_QUEUE_SIZE);
         journalCbQueueSize = statsLogger.getCounter(BookKeeperServerStats.JOURNAL_CB_QUEUE_SIZE);
+        cbThreadPoolQueueSize = statsLogger.getCounter(BookKeeperServerStats.CB_THREAD_POOL_QUEUE_SIZE);
         journalCbQueuedLatency = statsLogger.getOpStatsLogger(BookKeeperServerStats.JOURNAL_CB_QUEUED_LATENCY);
         flushMaxWaitCounter = statsLogger.getCounter(BookKeeperServerStats.JOURNAL_NUM_FLUSH_MAX_WAIT);
         flushMaxOutstandingBytesCounter =