You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by zh...@apache.org on 2018/01/16 08:18:19 UTC
[bookkeeper] branch master updated: Journal: add stat for cb queue
size
This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 2d9c327 Journal: add stat for cb queue size
2d9c327 is described below
commit 2d9c327fee60f5025ce9adad4d7c863958ab59ef
Author: Samuel Just <sj...@salesforce.com>
AuthorDate: Tue Jan 16 16:18:09 2018 +0800
Journal: add stat for cb queue size
Also adds a bit of functionality to TestStatsProvider to facilitate
testing.
(bug W-3019451)
Signed-off-by: Dustin Castor <dcastorsalesforce.com>
[Adapted to current branch, added simple test]
Signed-off-by: Samuel Just <sjustsalesforce.com>
Author: Samuel Just <sj...@salesforce.com>
Reviewers: Enrico Olivelli <eo...@gmail.com>, Jia Zhai <None>, Sijie Guo <si...@apache.org>
This closes #968 from athanatos/forupstream/stats1/journal
---
.../bookkeeper/bookie/BookKeeperServerStats.java | 1 +
.../java/org/apache/bookkeeper/bookie/Journal.java | 17 ++++++++++++---
.../org/apache/bookkeeper/test/OpStatTest.java | 25 ++++++++++++++++++++++
.../apache/bookkeeper/test/TestStatsProvider.java | 21 ++++++++++++++++--
4 files changed, 59 insertions(+), 5 deletions(-)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
index 636e9fd..8b1665e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
@@ -126,6 +126,7 @@ public interface BookKeeperServerStats {
String INDEX_INMEM_ILLEGAL_STATE_RESET = "INDEX_INMEM_ILLEGAL_STATE_RESET";
String INDEX_INMEM_ILLEGAL_STATE_DELETE = "INDEX_INMEM_ILLEGAL_STATE_DELETE";
String JOURNAL_FORCE_WRITE_QUEUE_SIZE = "JOURNAL_FORCE_WRITE_QUEUE_SIZE";
+ String JOURNAL_CB_QUEUE_SIZE = "JOURNAL_CB_QUEUE_SIZE";
String JOURNAL_NUM_FORCE_WRITES = "JOURNAL_NUM_FORCE_WRITES";
String JOURNAL_NUM_FLUSH_EMPTY_QUEUE = "JOURNAL_NUM_FLUSH_EMPTY_QUEUE";
String JOURNAL_NUM_FLUSH_MAX_OUTSTANDING_BYTES = "JOURNAL_NUM_FLUSH_MAX_OUTSTANDING_BYTES";
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index 1192fd9..29373d2 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -285,9 +285,13 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
long enqueueTime;
OpStatsLogger journalAddEntryStats;
+ Counter journalCbQueueSize;
- static QueueEntry create(ByteBuf entry, long ledgerId, long entryId, WriteCallback cb, Object ctx,
- long enqueueTime, OpStatsLogger journalAddEntryStats) {
+ static QueueEntry create(
+ ByteBuf entry, long ledgerId, long entryId, WriteCallback cb, Object ctx,
+ long enqueueTime,
+ OpStatsLogger journalAddEntryStats,
+ Counter journalCbQueueSize) {
QueueEntry qe = RECYCLER.get();
qe.entry = entry;
qe.cb = cb;
@@ -296,6 +300,7 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
qe.entryId = entryId;
qe.enqueueTime = enqueueTime;
qe.journalAddEntryStats = journalAddEntryStats;
+ qe.journalCbQueueSize = journalCbQueueSize;
return qe;
}
@@ -304,6 +309,7 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
if (LOG.isDebugEnabled()) {
LOG.debug("Acknowledge Ledger: {}, Entry: {}", ledgerId, entryId);
}
+ journalCbQueueSize.dec();
journalAddEntryStats.registerSuccessfulEvent(MathUtils.elapsedNanos(enqueueTime), TimeUnit.NANOSECONDS);
cb.writeComplete(0, ledgerId, entryId, null, ctx);
recycle();
@@ -351,6 +357,7 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
// Notify the waiters that the force write succeeded
for (int i = 0; i < forceWriteWaiters.size(); i++) {
cbThreadPool.execute(forceWriteWaiters.get(i));
+ journalCbQueueSize.inc();
}
return forceWriteWaiters.size();
@@ -583,6 +590,7 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
private final OpStatsLogger forceWriteBatchBytesStats;
private final Counter journalQueueSize;
private final Counter forceWriteQueueSize;
+ private final Counter journalCbQueueSize;
private final Counter flushMaxWaitCounter;
private final Counter flushMaxOutstandingBytesCounter;
private final Counter flushEmptyQueueCounter;
@@ -635,6 +643,7 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
forceWriteBatchBytesStats = statsLogger.getOpStatsLogger(BookKeeperServerStats.JOURNAL_FORCE_WRITE_BATCH_BYTES);
journalQueueSize = statsLogger.getCounter(BookKeeperServerStats.JOURNAL_QUEUE_SIZE);
forceWriteQueueSize = statsLogger.getCounter(BookKeeperServerStats.JOURNAL_FORCE_WRITE_QUEUE_SIZE);
+ journalCbQueueSize = statsLogger.getCounter(BookKeeperServerStats.JOURNAL_CB_QUEUE_SIZE);
flushMaxWaitCounter = statsLogger.getCounter(BookKeeperServerStats.JOURNAL_NUM_FLUSH_MAX_WAIT);
flushMaxOutstandingBytesCounter =
statsLogger.getCounter(BookKeeperServerStats.JOURNAL_NUM_FLUSH_MAX_OUTSTANDING_BYTES);
@@ -824,7 +833,9 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
//Retain entry until it gets written to journal
entry.retain();
- queue.add(QueueEntry.create(entry, ledgerId, entryId, cb, ctx, MathUtils.nowInNano(), journalAddEntryStats));
+ queue.add(QueueEntry.create(
+ entry, ledgerId, entryId, cb, ctx, MathUtils.nowInNano(),
+ journalAddEntryStats, journalQueueSize));
}
/**
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/OpStatTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/OpStatTest.java
index 798f151..1d85dfe 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/OpStatTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/OpStatTest.java
@@ -21,6 +21,11 @@
package org.apache.bookkeeper.test;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_SCOPE;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_CB_QUEUE_SIZE;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_FORCE_WRITE_QUEUE_SIZE;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_QUEUE_SIZE;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_SCOPE;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.SERVER_SCOPE;
import static org.junit.Assert.assertTrue;
@@ -71,6 +76,19 @@ public class OpStatTest extends BookKeeperClusterTestCase {
}
}
+ private void validateNonMonotonicCounterGauge(TestStatsProvider stats, String path, BiConsumer<Long, Long> f) {
+ assertTrue(stats != null);
+ TestStatsProvider.TestCounter counter = stats.getCounter(path);
+ assertTrue(counter != null);
+ f.accept(counter.get(), counter.getMax());
+ }
+
+ private void validateNonMonotonicCounterGauges(TestStatsProvider stats, String paths[], BiConsumer<Long, Long> f) {
+ for (String path : paths) {
+ validateNonMonotonicCounterGauge(stats, path, f);
+ }
+ }
+
@Test
public void testTopLevelBookieWriteCounters() throws Exception {
long startNanos = MathUtils.nowInNano();
@@ -95,6 +113,13 @@ public class OpStatTest extends BookKeeperClusterTestCase {
assertTrue(average > 0);
assertTrue(average <= elapsed);
});
+ validateNonMonotonicCounterGauges(stats, new String[]{
+ BOOKIE_SCOPE + "." + JOURNAL_SCOPE + "_0." + JOURNAL_CB_QUEUE_SIZE,
+ BOOKIE_SCOPE + "." + JOURNAL_SCOPE + "_0." + JOURNAL_FORCE_WRITE_QUEUE_SIZE,
+ BOOKIE_SCOPE + "." + JOURNAL_SCOPE + "_0." + JOURNAL_QUEUE_SIZE
+ }, (value, max) -> {
+ assertTrue(max > 0);
+ });
}
@Test
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestStatsProvider.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestStatsProvider.java
index b71031a..ac6bfaf 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestStatsProvider.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestStatsProvider.java
@@ -43,6 +43,7 @@ public class TestStatsProvider implements StatsProvider {
*/
public class TestCounter implements Counter {
private AtomicLong val = new AtomicLong(0);
+ private AtomicLong max = new AtomicLong(0);
@Override
public void clear() {
@@ -51,7 +52,7 @@ public class TestStatsProvider implements StatsProvider {
@Override
public void inc() {
- val.incrementAndGet();
+ updateMax(val.incrementAndGet());
}
@Override
@@ -61,13 +62,29 @@ public class TestStatsProvider implements StatsProvider {
@Override
public void add(long delta) {
- val.addAndGet(delta);
+ updateMax(val.addAndGet(delta));
}
@Override
public Long get() {
return val.get();
}
+
+ private void updateMax(long newVal) {
+ while (true) {
+ long curMax = max.get();
+ if (curMax > newVal) {
+ break;
+ }
+ if (max.compareAndSet(curMax, newVal)) {
+ break;
+ }
+ }
+ }
+
+ public Long getMax() {
+ return max.get();
+ }
}
/**
--
To stop receiving notification emails like this one, please contact
['"commits@bookkeeper.apache.org" <co...@bookkeeper.apache.org>'].