You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/12/14 01:28:31 UTC
[bookkeeper] branch master updated: [STATS] [DOC] Add @StatsDoc
annotation for journal stats
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 7e047d9 [STATS] [DOC] Add @StatsDoc annotation for journal stats
7e047d9 is described below
commit 7e047d93fc9b4407667c7a5d858dcd88a046598f
Author: Sijie Guo <gu...@gmail.com>
AuthorDate: Fri Dec 14 09:28:26 2018 +0800
[STATS] [DOC] Add @StatsDoc annotation for journal stats
Descriptions of the changes in this PR:
*Motivation*
As part of [BP-36](https://github.com/apache/bookkeeper/issues/1785),
this PR is to document the journal stats.
*Changes*
- convert journal stats to use StatsDoc for documenting metrics
Master Issue: #1785
Reviewers: Jia Zhai <None>, Enrico Olivelli <eo...@gmail.com>
This closes #1870 from sijie/statsdoc_journal
---
.../java/org/apache/bookkeeper/bookie/Journal.java | 103 +++++------
.../bookkeeper/bookie/stats/JournalStats.java | 190 +++++++++++++++++++++
.../bookkeeper/bookie/BookieJournalForceTest.java | 4 +-
3 files changed, 232 insertions(+), 65 deletions(-)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index 57f05c2..a868536 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -46,6 +46,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
+import org.apache.bookkeeper.bookie.stats.JournalStats;
import org.apache.bookkeeper.common.collections.RecyclableArrayList;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
@@ -363,8 +364,9 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
private long enqueueTime;
public int process(boolean shouldForceWrite) throws IOException {
- forceWriteQueueSize.dec();
- fwEnqueueTimeStats.registerSuccessfulEvent(MathUtils.elapsedNanos(enqueueTime), TimeUnit.NANOSECONDS);
+ journalStats.getForceWriteQueueSize().dec();
+ journalStats.getFwEnqueueTimeStats()
+ .registerSuccessfulEvent(MathUtils.elapsedNanos(enqueueTime), TimeUnit.NANOSECONDS);
if (isMarker) {
return 0;
@@ -374,7 +376,8 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
if (shouldForceWrite) {
long startTime = MathUtils.nowInNano();
this.logFile.forceWrite(false);
- journalSyncStats.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
+ journalStats.getJournalSyncStats()
+ .registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
}
lastLogMark.setCurLogMark(this.logId, this.lastFlushedPosition);
@@ -384,7 +387,7 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
if (qe != null) {
cbThreadPool.execute(qe);
}
- journalCbQueueSize.inc();
+ journalStats.getJournalCbQueueSize().inc();
}
return forceWriteWaiters.size();
@@ -438,7 +441,7 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
req.shouldClose = shouldClose;
req.isMarker = isMarker;
req.enqueueTime = MathUtils.nowInNano();
- forceWriteQueueSize.inc();
+ journalStats.getForceWriteQueueSize().inc();
return req;
}
@@ -490,7 +493,8 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
// the last force write and then reset the counter so we can accumulate
// requests in the write we are about to issue
if (numReqInLastForceWrite > 0) {
- forceWriteGroupingCountStats.registerSuccessfulValue(numReqInLastForceWrite);
+ journalStats.getForceWriteGroupingCountStats()
+ .registerSuccessfulValue(numReqInLastForceWrite);
numReqInLastForceWrite = 0;
}
}
@@ -614,24 +618,7 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
private final LedgerDirsManager ledgerDirsManager;
// Expose Stats
- private final OpStatsLogger journalAddEntryStats;
- private final OpStatsLogger journalForceLedgerStats;
- private final OpStatsLogger journalSyncStats;
- private final OpStatsLogger fwEnqueueTimeStats;
- private final OpStatsLogger journalCreationStats;
- private final OpStatsLogger journalFlushStats;
- private final OpStatsLogger journalProcessTimeStats;
- private final OpStatsLogger journalQueueStats;
- private final OpStatsLogger forceWriteGroupingCountStats;
- private final OpStatsLogger forceWriteBatchEntriesStats;
- private final OpStatsLogger forceWriteBatchBytesStats;
- private final Counter journalQueueSize;
- private final Counter forceWriteQueueSize;
- private final Counter journalCbQueueSize;
- private final Counter flushMaxWaitCounter;
- private final Counter flushMaxOutstandingBytesCounter;
- private final Counter flushEmptyQueueCounter;
- private final Counter journalWriteBytes;
+ private final JournalStats journalStats;
public Journal(int journalIndex, File journalDirectory, ServerConfiguration conf,
LedgerDirsManager ledgerDirsManager) {
@@ -679,27 +666,11 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
}
// Expose Stats
- journalAddEntryStats = statsLogger.getOpStatsLogger(BookKeeperServerStats.JOURNAL_ADD_ENTRY);
- journalForceLedgerStats = statsLogger.getOpStatsLogger(BookKeeperServerStats.JOURNAL_FORCE_LEDGER);
- journalSyncStats = statsLogger.getOpStatsLogger(BookKeeperServerStats.JOURNAL_SYNC);
- fwEnqueueTimeStats = statsLogger.getOpStatsLogger(BookKeeperServerStats.JOURNAL_FORCE_WRITE_ENQUEUE);
- journalCreationStats = statsLogger.getOpStatsLogger(BookKeeperServerStats.JOURNAL_CREATION_LATENCY);
- journalFlushStats = statsLogger.getOpStatsLogger(BookKeeperServerStats.JOURNAL_FLUSH_LATENCY);
- journalQueueStats = statsLogger.getOpStatsLogger(BookKeeperServerStats.JOURNAL_QUEUE_LATENCY);
- journalProcessTimeStats = statsLogger.getOpStatsLogger(BookKeeperServerStats.JOURNAL_PROCESS_TIME_LATENCY);
- forceWriteGroupingCountStats =
- statsLogger.getOpStatsLogger(BookKeeperServerStats.JOURNAL_FORCE_WRITE_GROUPING_COUNT);
- forceWriteBatchEntriesStats =
- statsLogger.getOpStatsLogger(BookKeeperServerStats.JOURNAL_FORCE_WRITE_BATCH_ENTRIES);
- forceWriteBatchBytesStats = statsLogger.getOpStatsLogger(BookKeeperServerStats.JOURNAL_FORCE_WRITE_BATCH_BYTES);
- journalQueueSize = statsLogger.getCounter(BookKeeperServerStats.JOURNAL_QUEUE_SIZE);
- forceWriteQueueSize = statsLogger.getCounter(BookKeeperServerStats.JOURNAL_FORCE_WRITE_QUEUE_SIZE);
- journalCbQueueSize = statsLogger.getCounter(BookKeeperServerStats.JOURNAL_CB_QUEUE_SIZE);
- flushMaxWaitCounter = statsLogger.getCounter(BookKeeperServerStats.JOURNAL_NUM_FLUSH_MAX_WAIT);
- flushMaxOutstandingBytesCounter =
- statsLogger.getCounter(BookKeeperServerStats.JOURNAL_NUM_FLUSH_MAX_OUTSTANDING_BYTES);
- flushEmptyQueueCounter = statsLogger.getCounter(BookKeeperServerStats.JOURNAL_NUM_FLUSH_EMPTY_QUEUE);
- journalWriteBytes = statsLogger.getCounter(BookKeeperServerStats.JOURNAL_WRITE_BYTES);
+ this.journalStats = new JournalStats(statsLogger);
+ }
+
+ JournalStats getJournalStats() {
+ return this.journalStats;
}
public File getJournalDirectory() {
@@ -891,18 +862,20 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
//Retain entry until it gets written to journal
entry.retain();
- journalQueueSize.inc();
+ journalStats.getJournalQueueSize().inc();
queue.put(QueueEntry.create(
entry, ackBeforeSync, ledgerId, entryId, cb, ctx, MathUtils.nowInNano(),
- journalAddEntryStats, journalQueueSize));
+ journalStats.getJournalAddEntryStats(),
+ journalStats.getJournalQueueSize()));
}
void forceLedger(long ledgerId, WriteCallback cb, Object ctx) {
- journalQueueSize.inc();
+ journalStats.getJournalQueueSize().inc();
queue.add(QueueEntry.create(
null, false /* ackBeforeSync */, ledgerId,
Bookie.METAENTRY_ID_FORCE_LEDGER, cb, ctx, MathUtils.nowInNano(),
- journalForceLedgerStats, journalQueueSize));
+ journalStats.getJournalForceLedgerStats(),
+ journalStats.getJournalQueueSize()));
}
/**
@@ -968,7 +941,7 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
journalAlignmentSize, removePagesFromCache,
journalFormatVersionToWrite, getBufferedChannelBuilder());
- journalCreationStats.registerSuccessfulEvent(
+ journalStats.getJournalCreationStats().registerSuccessfulEvent(
journalCreationWatcher.stop().elapsed(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
bc = logFile.getBufferedChannel();
@@ -978,15 +951,15 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
if (qe == null) {
if (dequeueStartTime != 0) {
- journalProcessTimeStats.registerSuccessfulEvent(MathUtils.elapsedNanos(dequeueStartTime),
- TimeUnit.NANOSECONDS);
+ journalStats.getJournalProcessTimeStats()
+ .registerSuccessfulEvent(MathUtils.elapsedNanos(dequeueStartTime), TimeUnit.NANOSECONDS);
}
if (numEntriesToFlush == 0) {
qe = queue.take();
dequeueStartTime = MathUtils.nowInNano();
- journalQueueStats.registerSuccessfulEvent(MathUtils.elapsedNanos(qe.enqueueTime),
- TimeUnit.NANOSECONDS);
+ journalStats.getJournalQueueStats()
+ .registerSuccessfulEvent(MathUtils.elapsedNanos(qe.enqueueTime), TimeUnit.NANOSECONDS);
} else {
long pollWaitTimeNanos = maxGroupWaitInNanos
- MathUtils.elapsedNanos(toFlush.get(0).enqueueTime);
@@ -997,8 +970,8 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
dequeueStartTime = MathUtils.nowInNano();
if (qe != null) {
- journalQueueStats.registerSuccessfulEvent(MathUtils.elapsedNanos(qe.enqueueTime),
- TimeUnit.NANOSECONDS);
+ journalStats.getJournalQueueStats()
+ .registerSuccessfulEvent(MathUtils.elapsedNanos(qe.enqueueTime), TimeUnit.NANOSECONDS);
}
boolean shouldFlush = false;
@@ -1017,20 +990,20 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
// b) limit the number of entries to group
groupWhenTimeout = false;
shouldFlush = true;
- flushMaxWaitCounter.inc();
+ journalStats.getFlushMaxWaitCounter().inc();
} else if (qe != null
&& ((bufferedEntriesThreshold > 0 && toFlush.size() > bufferedEntriesThreshold)
|| (bc.position() > lastFlushPosition + bufferedWritesThreshold))) {
// 2. If we have buffered more than the buffWriteThreshold or bufferedEntriesThreshold
shouldFlush = true;
- flushMaxOutstandingBytesCounter.inc();
+ journalStats.getFlushMaxOutstandingBytesCounter().inc();
} else if (qe == null) {
// We should get here only if we flushWhenQueueEmpty is true else we would wait
// for timeout that would put is past the maxWait threshold
// 3. If the queue is empty i.e. no benefit of grouping. This happens when we have one
// publish at a time - common case in tests.
shouldFlush = true;
- flushEmptyQueueCounter.inc();
+ journalStats.getFlushEmptyQueueCounter().inc();
}
// toFlush is non null and not empty so should be safe to access getFirst
@@ -1051,7 +1024,7 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
}
lastFlushPosition = bc.position();
- journalFlushStats.registerSuccessfulEvent(
+ journalStats.getJournalFlushStats().registerSuccessfulEvent(
journalFlushWatcher.stop().elapsed(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
// Trace the lifetime of entries through persistence
@@ -1064,8 +1037,10 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
}
}
- forceWriteBatchEntriesStats.registerSuccessfulValue(numEntriesToFlush);
- forceWriteBatchBytesStats.registerSuccessfulValue(batchSize);
+ journalStats.getForceWriteBatchEntriesStats()
+ .registerSuccessfulValue(numEntriesToFlush);
+ journalStats.getForceWriteBatchBytesStats()
+ .registerSuccessfulValue(batchSize);
boolean shouldRolloverJournal = (lastFlushPosition > maxJournalSize);
if (syncData) {
@@ -1120,8 +1095,8 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
qe.entry.release();
} else if (qe.entryId != Bookie.METAENTRY_ID_FORCE_LEDGER) {
int entrySize = qe.entry.readableBytes();
- journalWriteBytes.add(entrySize);
- journalQueueSize.dec();
+ journalStats.getJournalWriteBytes().add(entrySize);
+ journalStats.getJournalQueueSize().dec();
batchSize += (4 + entrySize);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/JournalStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/JournalStats.java
new file mode 100644
index 0000000..7c7f7e4
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/JournalStats.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.bookie.stats;
+
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.CATEGORY_SERVER;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.FORCE_LEDGER;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_ADD_ENTRY;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_CB_QUEUE_SIZE;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_CREATION_LATENCY;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_FLUSH_LATENCY;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_FORCE_LEDGER;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_FORCE_WRITE_BATCH_BYTES;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_FORCE_WRITE_BATCH_ENTRIES;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_FORCE_WRITE_ENQUEUE;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_FORCE_WRITE_GROUPING_COUNT;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_FORCE_WRITE_QUEUE_SIZE;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_NUM_FLUSH_EMPTY_QUEUE;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_NUM_FLUSH_MAX_OUTSTANDING_BYTES;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_NUM_FLUSH_MAX_WAIT;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_PROCESS_TIME_LATENCY;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_QUEUE_LATENCY;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_QUEUE_SIZE;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_SCOPE;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_SYNC;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_WRITE_BYTES;
+
+import lombok.Getter;
+import org.apache.bookkeeper.bookie.BookKeeperServerStats;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stats.annotations.StatsDoc;
+
+/**
+ * A umbrella class for journal related stats.
+ */
+@StatsDoc(
+ name = JOURNAL_SCOPE,
+ category = CATEGORY_SERVER,
+ help = "Journal related stats"
+)
+@Getter
+public class JournalStats {
+
+ @StatsDoc(
+ name = JOURNAL_ADD_ENTRY,
+ help = "operation stats of recording addEntry requests in the journal",
+ parent = ADD_ENTRY
+ )
+ private final OpStatsLogger journalAddEntryStats;
+ @StatsDoc(
+ name = JOURNAL_FORCE_LEDGER,
+ help = "operation stats of recording forceLedger requests in the journal",
+ parent = FORCE_LEDGER
+ )
+ private final OpStatsLogger journalForceLedgerStats;
+ @StatsDoc(
+ name = JOURNAL_SYNC,
+ help = "operation stats of syncing data to journal disks",
+ parent = JOURNAL_ADD_ENTRY,
+ happensAfter = JOURNAL_FORCE_WRITE_ENQUEUE
+ )
+ private final OpStatsLogger journalSyncStats;
+ @StatsDoc(
+ name = JOURNAL_FORCE_WRITE_ENQUEUE,
+ help = "operation stats of enqueueing force write requests to force write queue",
+ parent = JOURNAL_ADD_ENTRY,
+ happensAfter = JOURNAL_PROCESS_TIME_LATENCY
+ )
+ private final OpStatsLogger fwEnqueueTimeStats;
+ @StatsDoc(
+ name = JOURNAL_CREATION_LATENCY,
+ help = "operation stats of creating journal files",
+ parent = JOURNAL_PROCESS_TIME_LATENCY
+ )
+ private final OpStatsLogger journalCreationStats;
+ @StatsDoc(
+ name = JOURNAL_FLUSH_LATENCY,
+ help = "operation stats of flushing data from memory to filesystem (but not yet fsyncing to disks)",
+ parent = JOURNAL_PROCESS_TIME_LATENCY,
+ happensAfter = JOURNAL_CREATION_LATENCY
+ )
+ private final OpStatsLogger journalFlushStats;
+ @StatsDoc(
+ name = JOURNAL_PROCESS_TIME_LATENCY,
+ help = "operation stats of processing requests in a journal (from dequeue an item to finish processing it)",
+ parent = JOURNAL_ADD_ENTRY,
+ happensAfter = JOURNAL_QUEUE_LATENCY
+ )
+ private final OpStatsLogger journalProcessTimeStats;
+ @StatsDoc(
+ name = JOURNAL_QUEUE_LATENCY,
+ help = "operation stats of enqueuing requests to a journal",
+ parent = JOURNAL_ADD_ENTRY
+ )
+ private final OpStatsLogger journalQueueStats;
+ @StatsDoc(
+ name = JOURNAL_FORCE_WRITE_GROUPING_COUNT,
+ help = "The distribution of number of force write requests grouped in a force write"
+ )
+ private final OpStatsLogger forceWriteGroupingCountStats;
+ @StatsDoc(
+ name = JOURNAL_FORCE_WRITE_BATCH_ENTRIES,
+ help = "The distribution of number of entries grouped together into a force write request"
+ )
+ private final OpStatsLogger forceWriteBatchEntriesStats;
+ @StatsDoc(
+ name = JOURNAL_FORCE_WRITE_BATCH_BYTES,
+ help = "The distribution of number of bytes grouped together into a force write request"
+ )
+ private final OpStatsLogger forceWriteBatchBytesStats;
+ @StatsDoc(
+ name = JOURNAL_QUEUE_SIZE,
+ help = "The journal queue size"
+ )
+ private final Counter journalQueueSize;
+ @StatsDoc(
+ name = JOURNAL_FORCE_WRITE_QUEUE_SIZE,
+ help = "The force write queue size"
+ )
+ private final Counter forceWriteQueueSize;
+ @StatsDoc(
+ name = JOURNAL_CB_QUEUE_SIZE,
+ help = "The journal callback queue size"
+ )
+ private final Counter journalCbQueueSize;
+ @StatsDoc(
+ name = JOURNAL_NUM_FLUSH_MAX_WAIT,
+ help = "The number of journal flushes triggered by MAX_WAIT time"
+ )
+ private final Counter flushMaxWaitCounter;
+ @StatsDoc(
+ name = JOURNAL_NUM_FLUSH_MAX_OUTSTANDING_BYTES,
+ help = "The number of journal flushes triggered by MAX_OUTSTANDING_BYTES"
+ )
+ private final Counter flushMaxOutstandingBytesCounter;
+ @StatsDoc(
+ name = JOURNAL_NUM_FLUSH_EMPTY_QUEUE,
+ help = "The number of journal flushes triggered when journal queue becomes empty"
+ )
+ private final Counter flushEmptyQueueCounter;
+ @StatsDoc(
+ name = JOURNAL_WRITE_BYTES,
+ help = "The number of bytes appended to the journal"
+ )
+ private final Counter journalWriteBytes;
+
+ public JournalStats(StatsLogger statsLogger) {
+ journalAddEntryStats = statsLogger.getOpStatsLogger(BookKeeperServerStats.JOURNAL_ADD_ENTRY);
+ journalForceLedgerStats = statsLogger.getOpStatsLogger(BookKeeperServerStats.JOURNAL_FORCE_LEDGER);
+ journalSyncStats = statsLogger.getOpStatsLogger(BookKeeperServerStats.JOURNAL_SYNC);
+ fwEnqueueTimeStats = statsLogger.getOpStatsLogger(BookKeeperServerStats.JOURNAL_FORCE_WRITE_ENQUEUE);
+ journalCreationStats = statsLogger.getOpStatsLogger(BookKeeperServerStats.JOURNAL_CREATION_LATENCY);
+ journalFlushStats = statsLogger.getOpStatsLogger(BookKeeperServerStats.JOURNAL_FLUSH_LATENCY);
+ journalQueueStats = statsLogger.getOpStatsLogger(BookKeeperServerStats.JOURNAL_QUEUE_LATENCY);
+ journalProcessTimeStats = statsLogger.getOpStatsLogger(BookKeeperServerStats.JOURNAL_PROCESS_TIME_LATENCY);
+ forceWriteGroupingCountStats =
+ statsLogger.getOpStatsLogger(BookKeeperServerStats.JOURNAL_FORCE_WRITE_GROUPING_COUNT);
+ forceWriteBatchEntriesStats =
+ statsLogger.getOpStatsLogger(BookKeeperServerStats.JOURNAL_FORCE_WRITE_BATCH_ENTRIES);
+ forceWriteBatchBytesStats = statsLogger.getOpStatsLogger(BookKeeperServerStats.JOURNAL_FORCE_WRITE_BATCH_BYTES);
+ journalQueueSize = statsLogger.getCounter(BookKeeperServerStats.JOURNAL_QUEUE_SIZE);
+ forceWriteQueueSize = statsLogger.getCounter(BookKeeperServerStats.JOURNAL_FORCE_WRITE_QUEUE_SIZE);
+ journalCbQueueSize = statsLogger.getCounter(BookKeeperServerStats.JOURNAL_CB_QUEUE_SIZE);
+ flushMaxWaitCounter = statsLogger.getCounter(BookKeeperServerStats.JOURNAL_NUM_FLUSH_MAX_WAIT);
+ flushMaxOutstandingBytesCounter =
+ statsLogger.getCounter(BookKeeperServerStats.JOURNAL_NUM_FLUSH_MAX_OUTSTANDING_BYTES);
+ flushEmptyQueueCounter = statsLogger.getCounter(BookKeeperServerStats.JOURNAL_NUM_FLUSH_EMPTY_QUEUE);
+ journalWriteBytes = statsLogger.getCounter(BookKeeperServerStats.JOURNAL_WRITE_BYTES);
+ }
+
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalForceTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalForceTest.java
index 76daa7d..3ac507c 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalForceTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalForceTest.java
@@ -42,6 +42,7 @@ import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.bookie.Journal.ForceWriteRequest;
import org.apache.bookkeeper.bookie.Journal.LastLogMark;
+import org.apache.bookkeeper.bookie.stats.JournalStats;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.conf.TestBKConfiguration;
import org.apache.bookkeeper.net.BookieSocketAddress;
@@ -210,10 +211,11 @@ public class BookieJournalForceTest {
CountDownLatch forceWriteThreadSuspendedLatch = new CountDownLatch(1);
enableForceWriteThreadSuspension(forceWriteThreadSuspendedLatch, journal);
+ JournalStats journalStats = journal.getJournalStats();
TestStatsProvider testStatsProvider = new TestStatsProvider();
Counter flushMaxOutstandingBytesCounter = testStatsProvider.getStatsLogger("test")
.getCounter("flushMaxOutstandingBytesCounter");
- Whitebox.setInternalState(journal, "flushMaxOutstandingBytesCounter", flushMaxOutstandingBytesCounter);
+ Whitebox.setInternalState(journalStats, "flushMaxOutstandingBytesCounter", flushMaxOutstandingBytesCounter);
journal.start();