You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/01/16 08:18:25 UTC

[GitHub] jiazhai closed pull request #968: Journal: add stat for cb queue size

jiazhai closed pull request #968: Journal: add stat for cb queue size
URL: https://github.com/apache/bookkeeper/pull/968
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
index 2e104cd56..c69c38fb2 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
@@ -111,6 +111,7 @@
     String INDEX_INMEM_ILLEGAL_STATE_RESET = "INDEX_INMEM_ILLEGAL_STATE_RESET";
     String INDEX_INMEM_ILLEGAL_STATE_DELETE = "INDEX_INMEM_ILLEGAL_STATE_DELETE";
     String JOURNAL_FORCE_WRITE_QUEUE_SIZE = "JOURNAL_FORCE_WRITE_QUEUE_SIZE";
+    String JOURNAL_CB_QUEUE_SIZE = "JOURNAL_CB_QUEUE_SIZE";
     String JOURNAL_NUM_FORCE_WRITES = "JOURNAL_NUM_FORCE_WRITES";
     String JOURNAL_NUM_FLUSH_EMPTY_QUEUE = "JOURNAL_NUM_FLUSH_EMPTY_QUEUE";
     String JOURNAL_NUM_FLUSH_MAX_OUTSTANDING_BYTES = "JOURNAL_NUM_FLUSH_MAX_OUTSTANDING_BYTES";
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index 1192fd9fd..29373d2d8 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -285,9 +285,13 @@ public boolean accept(long journalId) {
         long enqueueTime;
 
         OpStatsLogger journalAddEntryStats;
+        Counter journalCbQueueSize;
 
-        static QueueEntry create(ByteBuf entry, long ledgerId, long entryId, WriteCallback cb, Object ctx,
-                long enqueueTime, OpStatsLogger journalAddEntryStats) {
+        static QueueEntry create(
+                ByteBuf entry, long ledgerId, long entryId, WriteCallback cb, Object ctx,
+                long enqueueTime,
+                OpStatsLogger journalAddEntryStats,
+                Counter journalCbQueueSize) {
             QueueEntry qe = RECYCLER.get();
             qe.entry = entry;
             qe.cb = cb;
@@ -296,6 +300,7 @@ static QueueEntry create(ByteBuf entry, long ledgerId, long entryId, WriteCallba
             qe.entryId = entryId;
             qe.enqueueTime = enqueueTime;
             qe.journalAddEntryStats = journalAddEntryStats;
+            qe.journalCbQueueSize = journalCbQueueSize;
             return qe;
         }
 
@@ -304,6 +309,7 @@ public void run() {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Acknowledge Ledger: {}, Entry: {}", ledgerId, entryId);
             }
+            journalCbQueueSize.dec();
             journalAddEntryStats.registerSuccessfulEvent(MathUtils.elapsedNanos(enqueueTime), TimeUnit.NANOSECONDS);
             cb.writeComplete(0, ledgerId, entryId, null, ctx);
             recycle();
@@ -351,6 +357,7 @@ public int process(boolean shouldForceWrite) throws IOException {
                 // Notify the waiters that the force write succeeded
                 for (int i = 0; i < forceWriteWaiters.size(); i++) {
                     cbThreadPool.execute(forceWriteWaiters.get(i));
+                    journalCbQueueSize.inc();
                 }
 
                 return forceWriteWaiters.size();
@@ -583,6 +590,7 @@ static void writePaddingBytes(JournalChannel jc, ByteBuf paddingBuffer, int jour
     private final OpStatsLogger forceWriteBatchBytesStats;
     private final Counter journalQueueSize;
     private final Counter forceWriteQueueSize;
+    private final Counter journalCbQueueSize;
     private final Counter flushMaxWaitCounter;
     private final Counter flushMaxOutstandingBytesCounter;
     private final Counter flushEmptyQueueCounter;
@@ -635,6 +643,7 @@ public Journal(File journalDirectory, ServerConfiguration conf, LedgerDirsManage
         forceWriteBatchBytesStats = statsLogger.getOpStatsLogger(BookKeeperServerStats.JOURNAL_FORCE_WRITE_BATCH_BYTES);
         journalQueueSize = statsLogger.getCounter(BookKeeperServerStats.JOURNAL_QUEUE_SIZE);
         forceWriteQueueSize = statsLogger.getCounter(BookKeeperServerStats.JOURNAL_FORCE_WRITE_QUEUE_SIZE);
+        journalCbQueueSize = statsLogger.getCounter(BookKeeperServerStats.JOURNAL_CB_QUEUE_SIZE);
         flushMaxWaitCounter = statsLogger.getCounter(BookKeeperServerStats.JOURNAL_NUM_FLUSH_MAX_WAIT);
         flushMaxOutstandingBytesCounter =
                 statsLogger.getCounter(BookKeeperServerStats.JOURNAL_NUM_FLUSH_MAX_OUTSTANDING_BYTES);
@@ -824,7 +833,9 @@ public void logAddEntry(ByteBuf entry, WriteCallback cb, Object ctx) {
 
         //Retain entry until it gets written to journal
         entry.retain();
-        queue.add(QueueEntry.create(entry, ledgerId, entryId, cb, ctx, MathUtils.nowInNano(), journalAddEntryStats));
+        queue.add(QueueEntry.create(
+                entry, ledgerId, entryId, cb, ctx, MathUtils.nowInNano(),
+                journalAddEntryStats, journalQueueSize));
     }
 
     /**
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/OpStatTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/OpStatTest.java
index 798f151bb..1d85dfe8a 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/OpStatTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/OpStatTest.java
@@ -21,6 +21,11 @@
 
 package org.apache.bookkeeper.test;
 
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_SCOPE;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_CB_QUEUE_SIZE;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_FORCE_WRITE_QUEUE_SIZE;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_QUEUE_SIZE;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_SCOPE;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.SERVER_SCOPE;
 import static org.junit.Assert.assertTrue;
 
@@ -71,6 +76,19 @@ private void validateOpStat(TestStatsProvider stats, String paths[], BiConsumer<
         }
     }
 
+    private void validateNonMonotonicCounterGauge(TestStatsProvider stats, String path, BiConsumer<Long, Long> f) {
+        assertTrue(stats != null);
+        TestStatsProvider.TestCounter counter = stats.getCounter(path);
+        assertTrue(counter != null);
+        f.accept(counter.get(), counter.getMax());
+    }
+
+    private void validateNonMonotonicCounterGauges(TestStatsProvider stats, String paths[], BiConsumer<Long, Long> f) {
+        for (String path : paths) {
+            validateNonMonotonicCounterGauge(stats, path, f);
+        }
+    }
+
     @Test
     public void testTopLevelBookieWriteCounters() throws Exception {
         long startNanos = MathUtils.nowInNano();
@@ -95,6 +113,13 @@ public void testTopLevelBookieWriteCounters() throws Exception {
             assertTrue(average > 0);
             assertTrue(average <= elapsed);
         });
+        validateNonMonotonicCounterGauges(stats, new String[]{
+                BOOKIE_SCOPE + "." + JOURNAL_SCOPE + "_0." + JOURNAL_CB_QUEUE_SIZE,
+                BOOKIE_SCOPE + "." + JOURNAL_SCOPE + "_0." + JOURNAL_FORCE_WRITE_QUEUE_SIZE,
+                BOOKIE_SCOPE + "." + JOURNAL_SCOPE + "_0." + JOURNAL_QUEUE_SIZE
+        }, (value, max) -> {
+            assertTrue(max > 0);
+        });
     }
 
     @Test
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestStatsProvider.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestStatsProvider.java
index b55ebad29..a268720de 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestStatsProvider.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestStatsProvider.java
@@ -43,6 +43,7 @@
      */
     public class TestCounter implements Counter {
         private AtomicLong val = new AtomicLong(0);
+        private AtomicLong max = new AtomicLong(0);
 
         @Override
         public void clear() {
@@ -51,7 +52,7 @@ public void clear() {
 
         @Override
         public void inc() {
-            val.incrementAndGet();
+            updateMax(val.incrementAndGet());
         }
 
         @Override
@@ -61,13 +62,29 @@ public void dec() {
 
         @Override
         public void add(long delta) {
-            val.addAndGet(delta);
+            updateMax(val.addAndGet(delta));
         }
 
         @Override
         public Long get() {
             return val.get();
         }
+
+        private void updateMax(long newVal) {
+            while (true) {
+                long curMax = max.get();
+                if (curMax > newVal) {
+                    break;
+                }
+                if (max.compareAndSet(curMax, newVal)) {
+                    break;
+                }
+            }
+        }
+
+        public Long getMax() {
+            return max.get();
+        }
     }
 
     /**


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services