You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by mm...@apache.org on 2021/03/18 18:16:22 UTC

[bookkeeper] branch master updated: Issue 2622: Fix usage of JournalQueueSize and JournalCbQueueSize; JournalCbQueueSize should be bounded (#2623)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new ec257fc  Issue 2622: Fix usage of JournalQueueSize and JournalCbQueueSize; JournalCbQueueSize should be bounded (#2623)
ec257fc is described below

commit ec257fc9b2840e2dd18ec4d8068c7319c405c3a0
Author: Michael Marshall <47...@users.noreply.github.com>
AuthorDate: Thu Mar 18 12:16:16 2021 -0600

    Issue 2622: Fix usage of JournalQueueSize and JournalCbQueueSize; JournalCbQueueSize should be bounded (#2623)
    
    * Fix usage of JournalQueueSize and JournalCBQueueSize; JournalCBQueueSize should be bounded
    
    * Fix capitalization
---
 .../main/java/org/apache/bookkeeper/bookie/Journal.java    | 14 +++++++++-----
 1 file changed, 9 insertions(+), 5 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index 1eec6a3..3486415 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -390,7 +390,6 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
                     if (qe != null) {
                         cbThreadPool.execute(qe);
                     }
-                    journalStats.getJournalCbQueueSize().inc();
                 }
 
                 return forceWriteWaiters.size();
@@ -865,19 +864,22 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
         entry.retain();
 
         journalStats.getJournalQueueSize().inc();
+        journalStats.getJournalCbQueueSize().inc();
         queue.put(QueueEntry.create(
                 entry, ackBeforeSync,  ledgerId, entryId, cb, ctx, MathUtils.nowInNano(),
                 journalStats.getJournalAddEntryStats(),
-                journalStats.getJournalQueueSize()));
+                journalStats.getJournalCbQueueSize()));
     }
 
     void forceLedger(long ledgerId, WriteCallback cb, Object ctx) {
-        journalStats.getJournalQueueSize().inc();
         queue.add(QueueEntry.create(
-                null, false /* ackBeforeSync */,  ledgerId,
+                null, false /* ackBeforeSync */, ledgerId,
                 Bookie.METAENTRY_ID_FORCE_LEDGER, cb, ctx, MathUtils.nowInNano(),
                 journalStats.getJournalForceLedgerStats(),
-                journalStats.getJournalQueueSize()));
+                journalStats.getJournalCbQueueSize()));
+        // Increment afterwards because the add operation could fail.
+        journalStats.getJournalQueueSize().inc();
+        journalStats.getJournalCbQueueSize().inc();
     }
 
     /**
@@ -969,6 +971,7 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
                     if (numEntriesToFlush == 0) {
                         qe = queue.take();
                         dequeueStartTime = MathUtils.nowInNano();
+                        journalStats.getJournalQueueSize().dec();
                         journalStats.getJournalQueueStats()
                             .registerSuccessfulEvent(MathUtils.elapsedNanos(qe.enqueueTime), TimeUnit.NANOSECONDS);
                     } else {
@@ -981,6 +984,7 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
                         dequeueStartTime = MathUtils.nowInNano();
 
                         if (qe != null) {
+                            journalStats.getJournalQueueSize().dec();
                             journalStats.getJournalQueueStats()
                                 .registerSuccessfulEvent(MathUtils.elapsedNanos(qe.enqueueTime), TimeUnit.NANOSECONDS);
                         }