You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by zh...@apache.org on 2018/01/16 08:18:19 UTC

[bookkeeper] branch master updated: Journal: add stat for cb queue size

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 2d9c327  Journal: add stat for cb queue size
2d9c327 is described below

commit 2d9c327fee60f5025ce9adad4d7c863958ab59ef
Author: Samuel Just <sj...@salesforce.com>
AuthorDate: Tue Jan 16 16:18:09 2018 +0800

    Journal: add stat for cb queue size
    
    Also adds a bit of functionality to TestStatsProvider to facilitate
    testing.
    
    (bug W-3019451)
    Signed-off-by: Dustin Castor <dcastorsalesforce.com>
    [Adapted to current branch, added simple test]
    Signed-off-by: Samuel Just <sjustsalesforce.com>
    
    Author: Samuel Just <sj...@salesforce.com>
    
    Reviewers: Enrico Olivelli <eo...@gmail.com>, Jia Zhai <None>, Sijie Guo <si...@apache.org>
    
    This closes #968 from athanatos/forupstream/stats1/journal
---
 .../bookkeeper/bookie/BookKeeperServerStats.java   |  1 +
 .../java/org/apache/bookkeeper/bookie/Journal.java | 17 ++++++++++++---
 .../org/apache/bookkeeper/test/OpStatTest.java     | 25 ++++++++++++++++++++++
 .../apache/bookkeeper/test/TestStatsProvider.java  | 21 ++++++++++++++++--
 4 files changed, 59 insertions(+), 5 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
index 636e9fd..8b1665e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
@@ -126,6 +126,7 @@ public interface BookKeeperServerStats {
     String INDEX_INMEM_ILLEGAL_STATE_RESET = "INDEX_INMEM_ILLEGAL_STATE_RESET";
     String INDEX_INMEM_ILLEGAL_STATE_DELETE = "INDEX_INMEM_ILLEGAL_STATE_DELETE";
     String JOURNAL_FORCE_WRITE_QUEUE_SIZE = "JOURNAL_FORCE_WRITE_QUEUE_SIZE";
+    String JOURNAL_CB_QUEUE_SIZE = "JOURNAL_CB_QUEUE_SIZE";
     String JOURNAL_NUM_FORCE_WRITES = "JOURNAL_NUM_FORCE_WRITES";
     String JOURNAL_NUM_FLUSH_EMPTY_QUEUE = "JOURNAL_NUM_FLUSH_EMPTY_QUEUE";
     String JOURNAL_NUM_FLUSH_MAX_OUTSTANDING_BYTES = "JOURNAL_NUM_FLUSH_MAX_OUTSTANDING_BYTES";
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index 1192fd9..29373d2 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -285,9 +285,13 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
         long enqueueTime;
 
         OpStatsLogger journalAddEntryStats;
+        Counter journalCbQueueSize;
 
-        static QueueEntry create(ByteBuf entry, long ledgerId, long entryId, WriteCallback cb, Object ctx,
-                long enqueueTime, OpStatsLogger journalAddEntryStats) {
+        static QueueEntry create(
+                ByteBuf entry, long ledgerId, long entryId, WriteCallback cb, Object ctx,
+                long enqueueTime,
+                OpStatsLogger journalAddEntryStats,
+                Counter journalCbQueueSize) {
             QueueEntry qe = RECYCLER.get();
             qe.entry = entry;
             qe.cb = cb;
@@ -296,6 +300,7 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
             qe.entryId = entryId;
             qe.enqueueTime = enqueueTime;
             qe.journalAddEntryStats = journalAddEntryStats;
+            qe.journalCbQueueSize = journalCbQueueSize;
             return qe;
         }
 
@@ -304,6 +309,7 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Acknowledge Ledger: {}, Entry: {}", ledgerId, entryId);
             }
+            journalCbQueueSize.dec();
             journalAddEntryStats.registerSuccessfulEvent(MathUtils.elapsedNanos(enqueueTime), TimeUnit.NANOSECONDS);
             cb.writeComplete(0, ledgerId, entryId, null, ctx);
             recycle();
@@ -351,6 +357,7 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
                 // Notify the waiters that the force write succeeded
                 for (int i = 0; i < forceWriteWaiters.size(); i++) {
                     cbThreadPool.execute(forceWriteWaiters.get(i));
+                    journalCbQueueSize.inc();
                 }
 
                 return forceWriteWaiters.size();
@@ -583,6 +590,7 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
     private final OpStatsLogger forceWriteBatchBytesStats;
     private final Counter journalQueueSize;
     private final Counter forceWriteQueueSize;
+    private final Counter journalCbQueueSize;
     private final Counter flushMaxWaitCounter;
     private final Counter flushMaxOutstandingBytesCounter;
     private final Counter flushEmptyQueueCounter;
@@ -635,6 +643,7 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
         forceWriteBatchBytesStats = statsLogger.getOpStatsLogger(BookKeeperServerStats.JOURNAL_FORCE_WRITE_BATCH_BYTES);
         journalQueueSize = statsLogger.getCounter(BookKeeperServerStats.JOURNAL_QUEUE_SIZE);
         forceWriteQueueSize = statsLogger.getCounter(BookKeeperServerStats.JOURNAL_FORCE_WRITE_QUEUE_SIZE);
+        journalCbQueueSize = statsLogger.getCounter(BookKeeperServerStats.JOURNAL_CB_QUEUE_SIZE);
         flushMaxWaitCounter = statsLogger.getCounter(BookKeeperServerStats.JOURNAL_NUM_FLUSH_MAX_WAIT);
         flushMaxOutstandingBytesCounter =
                 statsLogger.getCounter(BookKeeperServerStats.JOURNAL_NUM_FLUSH_MAX_OUTSTANDING_BYTES);
@@ -824,7 +833,9 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
 
         //Retain entry until it gets written to journal
         entry.retain();
-        queue.add(QueueEntry.create(entry, ledgerId, entryId, cb, ctx, MathUtils.nowInNano(), journalAddEntryStats));
+        queue.add(QueueEntry.create(
+                entry, ledgerId, entryId, cb, ctx, MathUtils.nowInNano(),
+                journalAddEntryStats, journalQueueSize));
     }
 
     /**
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/OpStatTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/OpStatTest.java
index 798f151..1d85dfe 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/OpStatTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/OpStatTest.java
@@ -21,6 +21,11 @@
 
 package org.apache.bookkeeper.test;
 
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_SCOPE;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_CB_QUEUE_SIZE;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_FORCE_WRITE_QUEUE_SIZE;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_QUEUE_SIZE;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_SCOPE;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.SERVER_SCOPE;
 import static org.junit.Assert.assertTrue;
 
@@ -71,6 +76,19 @@ public class OpStatTest extends BookKeeperClusterTestCase {
         }
     }
 
+    private void validateNonMonotonicCounterGauge(TestStatsProvider stats, String path, BiConsumer<Long, Long> f) {
+        assertTrue(stats != null);
+        TestStatsProvider.TestCounter counter = stats.getCounter(path);
+        assertTrue(counter != null);
+        f.accept(counter.get(), counter.getMax());
+    }
+
+    private void validateNonMonotonicCounterGauges(TestStatsProvider stats, String paths[], BiConsumer<Long, Long> f) {
+        for (String path : paths) {
+            validateNonMonotonicCounterGauge(stats, path, f);
+        }
+    }
+
     @Test
     public void testTopLevelBookieWriteCounters() throws Exception {
         long startNanos = MathUtils.nowInNano();
@@ -95,6 +113,13 @@ public class OpStatTest extends BookKeeperClusterTestCase {
             assertTrue(average > 0);
             assertTrue(average <= elapsed);
         });
+        validateNonMonotonicCounterGauges(stats, new String[]{
+                BOOKIE_SCOPE + "." + JOURNAL_SCOPE + "_0." + JOURNAL_CB_QUEUE_SIZE,
+                BOOKIE_SCOPE + "." + JOURNAL_SCOPE + "_0." + JOURNAL_FORCE_WRITE_QUEUE_SIZE,
+                BOOKIE_SCOPE + "." + JOURNAL_SCOPE + "_0." + JOURNAL_QUEUE_SIZE
+        }, (value, max) -> {
+            assertTrue(max > 0);
+        });
     }
 
     @Test
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestStatsProvider.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestStatsProvider.java
index b71031a..ac6bfaf 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestStatsProvider.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestStatsProvider.java
@@ -43,6 +43,7 @@ public class TestStatsProvider implements StatsProvider {
      */
     public class TestCounter implements Counter {
         private AtomicLong val = new AtomicLong(0);
+        private AtomicLong max = new AtomicLong(0);
 
         @Override
         public void clear() {
@@ -51,7 +52,7 @@ public class TestStatsProvider implements StatsProvider {
 
         @Override
         public void inc() {
-            val.incrementAndGet();
+            updateMax(val.incrementAndGet());
         }
 
         @Override
@@ -61,13 +62,29 @@ public class TestStatsProvider implements StatsProvider {
 
         @Override
         public void add(long delta) {
-            val.addAndGet(delta);
+            updateMax(val.addAndGet(delta));
         }
 
         @Override
         public Long get() {
             return val.get();
         }
+
+        private void updateMax(long newVal) {
+            while (true) {
+                long curMax = max.get();
+                if (curMax > newVal) {
+                    break;
+                }
+                if (max.compareAndSet(curMax, newVal)) {
+                    break;
+                }
+            }
+        }
+
+        public Long getMax() {
+            return max.get();
+        }
     }
 
     /**

-- 
To stop receiving notification emails like this one, please contact
['"commits@bookkeeper.apache.org" <co...@bookkeeper.apache.org>'].