You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by eo...@apache.org on 2022/07/07 07:52:05 UTC

[bookkeeper] branch master updated: Metrics: add journalCbQueueLatency (#3364)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new f7732921e7 Metrics: add journalCbQueueLatency (#3364)
f7732921e7 is described below

commit f7732921e72021e2f42dff32b12796f290249a33
Author: LinChen <15...@qq.com>
AuthorDate: Thu Jul 7 15:52:00 2022 +0800

    Metrics: add journalCbQueueLatency (#3364)
---
 .../apache/bookkeeper/bookie/BookKeeperServerStats.java  |  1 +
 .../main/java/org/apache/bookkeeper/bookie/Journal.java  | 16 +++++++++++++++-
 .../org/apache/bookkeeper/bookie/stats/JournalStats.java |  9 +++++++++
 3 files changed, 25 insertions(+), 1 deletion(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
index 1e245a4dab..8eda11f26c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
@@ -162,6 +162,7 @@ public interface BookKeeperServerStats {
     String INDEX_INMEM_ILLEGAL_STATE_DELETE = "INDEX_INMEM_ILLEGAL_STATE_DELETE";
     String JOURNAL_FORCE_WRITE_QUEUE_SIZE = "JOURNAL_FORCE_WRITE_QUEUE_SIZE";
     String JOURNAL_CB_QUEUE_SIZE = "JOURNAL_CB_QUEUE_SIZE";
+    String JOURNAL_CB_QUEUED_LATENCY = "JOURNAL_CB_QUEUED_LATENCY";
     String JOURNAL_NUM_FORCE_WRITES = "JOURNAL_NUM_FORCE_WRITES";
     String JOURNAL_NUM_FLUSH_EMPTY_QUEUE = "JOURNAL_NUM_FLUSH_EMPTY_QUEUE";
     String JOURNAL_NUM_FLUSH_MAX_OUTSTANDING_BYTES = "JOURNAL_NUM_FLUSH_MAX_OUTSTANDING_BYTES";
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index 447f2409ba..eec4ca724b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -303,15 +303,18 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
         WriteCallback cb;
         Object ctx;
         long enqueueTime;
+        long enqueueCbThreadPooleQueueTime;
         boolean ackBeforeSync;
 
         OpStatsLogger journalAddEntryStats;
+        OpStatsLogger journalCbQueuedLatency;
         Counter journalCbQueueSize;
         Counter callbackTime;
 
         static QueueEntry create(ByteBuf entry, boolean ackBeforeSync, long ledgerId, long entryId,
                 WriteCallback cb, Object ctx, long enqueueTime, OpStatsLogger journalAddEntryStats,
-                Counter journalCbQueueSize, Counter callbackTime) {
+                Counter journalCbQueueSize, OpStatsLogger journalCbQueuedLatency,
+                Counter callbackTime) {
             QueueEntry qe = RECYCLER.get();
             qe.entry = entry;
             qe.ackBeforeSync = ackBeforeSync;
@@ -321,13 +324,20 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
             qe.entryId = entryId;
             qe.enqueueTime = enqueueTime;
             qe.journalAddEntryStats = journalAddEntryStats;
+            qe.journalCbQueuedLatency = journalCbQueuedLatency;
             qe.journalCbQueueSize = journalCbQueueSize;
             qe.callbackTime = callbackTime;
             return qe;
         }
 
+        public void setEnqueueCbThreadPooleQueueTime(long enqueueCbThreadPooleQueueTime) {
+            this.enqueueCbThreadPooleQueueTime = enqueueCbThreadPooleQueueTime;
+        }
+
         @Override
         public void run() {
+            journalCbQueuedLatency.registerSuccessfulEvent(
+                    MathUtils.elapsedNanos(enqueueCbThreadPooleQueueTime), TimeUnit.NANOSECONDS);
             long startTime = System.nanoTime();
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Acknowledge Ledger: {}, Entry: {}", ledgerId, entryId);
@@ -392,6 +402,7 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
                 for (int i = 0; i < forceWriteWaiters.size(); i++) {
                     QueueEntry qe = forceWriteWaiters.get(i);
                     if (qe != null) {
+                        qe.setEnqueueCbThreadPooleQueueTime(MathUtils.nowInNano());
                         cbThreadPool.execute(qe);
                     }
                 }
@@ -943,6 +954,7 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
                 entry, ackBeforeSync,  ledgerId, entryId, cb, ctx, MathUtils.nowInNano(),
                 journalStats.getJournalAddEntryStats(),
                 journalStats.getJournalCbQueueSize(),
+                journalStats.getJournalCbQueuedLatency(),
                 callbackTime));
     }
 
@@ -952,6 +964,7 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
                 BookieImpl.METAENTRY_ID_FORCE_LEDGER, cb, ctx, MathUtils.nowInNano(),
                 journalStats.getJournalForceLedgerStats(),
                 journalStats.getJournalCbQueueSize(),
+                journalStats.getJournalCbQueuedLatency(),
                 callbackTime));
         // Increment afterwards because the add operation could fail.
         journalStats.getJournalQueueSize().inc();
@@ -1118,6 +1131,7 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
                                 if (entry != null && (!syncData || entry.ackBeforeSync)) {
                                     toFlush.set(i, null);
                                     numEntriesToFlush--;
+                                    entry.setEnqueueCbThreadPooleQueueTime(MathUtils.nowInNano());
                                     cbThreadPool.execute(entry);
                                 }
                             }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/JournalStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/JournalStats.java
index 133936de3e..a35d74d566 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/JournalStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/JournalStats.java
@@ -23,6 +23,7 @@ import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.CATEGORY_SERVER;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.FORCE_LEDGER;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_ADD_ENTRY;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_CB_QUEUED_LATENCY;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_CB_QUEUE_SIZE;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_CREATION_LATENCY;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_FLUSH_LATENCY;
@@ -153,6 +154,13 @@ public class JournalStats {
         help = "The journal callback queue size"
     )
     private final Counter journalCbQueueSize;
+
+    @StatsDoc(
+            name = JOURNAL_CB_QUEUED_LATENCY,
+            help = "The journal callback queued latency"
+    )
+    private final OpStatsLogger journalCbQueuedLatency;
+
     @StatsDoc(
         name = JOURNAL_NUM_FLUSH_MAX_WAIT,
         help = "The number of journal flushes triggered by MAX_WAIT time"
@@ -203,6 +211,7 @@ public class JournalStats {
         journalQueueSize = statsLogger.getCounter(BookKeeperServerStats.JOURNAL_QUEUE_SIZE);
         forceWriteQueueSize = statsLogger.getCounter(BookKeeperServerStats.JOURNAL_FORCE_WRITE_QUEUE_SIZE);
         journalCbQueueSize = statsLogger.getCounter(BookKeeperServerStats.JOURNAL_CB_QUEUE_SIZE);
+        journalCbQueuedLatency = statsLogger.getOpStatsLogger(BookKeeperServerStats.JOURNAL_CB_QUEUED_LATENCY);
         flushMaxWaitCounter = statsLogger.getCounter(BookKeeperServerStats.JOURNAL_NUM_FLUSH_MAX_WAIT);
         flushMaxOutstandingBytesCounter =
                 statsLogger.getCounter(BookKeeperServerStats.JOURNAL_NUM_FLUSH_MAX_OUTSTANDING_BYTES);