You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/11/29 02:41:38 UTC
[bookkeeper] branch master updated: [STATS] Add @StatsDoc
annotation for bookie server request stats
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 8979e9e [STATS] Add @StatsDoc annotation for bookie server request stats
8979e9e is described below
commit 8979e9e0dc7f1c7e6bae2636c508ca529593e17a
Author: Sijie Guo <gu...@gmail.com>
AuthorDate: Wed Nov 28 18:41:34 2018 -0800
[STATS] Add @StatsDoc annotation for bookie server request stats
Descriptions of the changes in this PR:
*Motivation*
As part of [BP-36](https://github.com/apache/bookkeeper/issues/1785), this PR is to document the request stats
for bookkeeper server.
*Changes*
- add `parent` and `happensAfter` in StatsDoc
- convert bookie request stats to use StatsDoc for documenting metrics
Master Issue: #1785
Reviewers: Jia Zhai <None>
This closes #1839 from sijie/bp36_add_parent_and_happensafter
---
.../bookkeeper/bookie/stats/BookieStats.java | 14 +-
.../bookkeeper/proto/BookieRequestProcessor.java | 185 ++---------
.../bookkeeper/proto/ForceLedgerProcessorV3.java | 11 +-
.../bookkeeper/proto/GetBookieInfoProcessorV3.java | 10 +-
.../proto/LongPollReadEntryProcessorV3.java | 6 +-
.../bookkeeper/proto/PacketProcessorBase.java | 2 +-
.../bookkeeper/proto/PacketProcessorBaseV3.java | 6 +-
.../bookkeeper/proto/ReadEntryProcessor.java | 13 +-
.../bookkeeper/proto/ReadEntryProcessorV3.java | 20 +-
.../bookkeeper/proto/ReadLacProcessorV3.java | 10 +-
.../org/apache/bookkeeper/proto/RequestStats.java | 342 +++++++++++++++++++++
.../bookkeeper/proto/WriteEntryProcessor.java | 18 +-
.../bookkeeper/proto/WriteEntryProcessorV3.java | 12 +-
.../bookkeeper/proto/WriteLacProcessorV3.java | 19 +-
.../proto/ForceLedgerProcessorV3Test.java | 5 +-
.../bookkeeper/proto/WriteEntryProcessorTest.java | 5 +-
.../proto/WriteEntryProcessorV3Test.java | 7 +-
.../bookkeeper/stats/annotations/StatsDoc.java | 20 ++
18 files changed, 470 insertions(+), 235 deletions(-)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/BookieStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/BookieStats.java
index 72921d7..5e033e9 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/BookieStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/BookieStats.java
@@ -19,6 +19,7 @@
package org.apache.bookkeeper.bookie.stats;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_ADD_ENTRY;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_ADD_ENTRY_BYTES;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_FORCE_LEDGER;
@@ -28,6 +29,7 @@ import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_RECOVERY
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_SCOPE;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.CATEGORY_SERVER;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_BYTES;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.WRITE_BYTES;
import lombok.Getter;
@@ -56,11 +58,19 @@ public class BookieStats {
@StatsDoc(name = BOOKIE_FORCE_LEDGER, help = "total force operations occurred on a bookie")
private final Counter forceLedgerOps;
// Bookie Operation Latency Stats
- @StatsDoc(name = BOOKIE_ADD_ENTRY, help = "operations stats of AddEntry on a bookie")
+ @StatsDoc(
+ name = BOOKIE_ADD_ENTRY,
+ help = "operations stats of AddEntry on a bookie",
+ parent = ADD_ENTRY
+ )
private final OpStatsLogger addEntryStats;
@StatsDoc(name = BOOKIE_RECOVERY_ADD_ENTRY, help = "operation stats of RecoveryAddEntry on a bookie")
private final OpStatsLogger recoveryAddEntryStats;
- @StatsDoc(name = BOOKIE_READ_ENTRY, help = "operation stats of ReadEntry on a bookie")
+ @StatsDoc(
+ name = BOOKIE_READ_ENTRY,
+ help = "operation stats of ReadEntry on a bookie",
+ parent = READ_ENTRY
+ )
private final OpStatsLogger readEntryStats;
// Bookie Operation Bytes Stats
@StatsDoc(name = BOOKIE_ADD_ENTRY_BYTES, help = "bytes stats of AddEntry on a bookie")
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
index 78b35ec..b883f74 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
@@ -21,34 +21,6 @@
package org.apache.bookkeeper.proto;
import static com.google.common.base.Preconditions.checkArgument;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY_BLOCKED;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY_BLOCKED_WAIT;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY_IN_PROGRESS;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY_REQUEST;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.CHANNEL_WRITE;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.FORCE_LEDGER;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.FORCE_LEDGER_REQUEST;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.GET_BOOKIE_INFO;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.GET_BOOKIE_INFO_REQUEST;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_BLOCKED;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_BLOCKED_WAIT;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_FENCE_READ;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_FENCE_REQUEST;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_FENCE_WAIT;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_IN_PROGRESS;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_LONG_POLL_PRE_WAIT;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_LONG_POLL_READ;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_LONG_POLL_REQUEST;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_LONG_POLL_WAIT;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_REQUEST;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_SCHEDULING_DELAY;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_LAC;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_LAC_REQUEST;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_LAST_ENTRY_NOENTRY_ERROR;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.WRITE_LAC;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.WRITE_LAC_REQUEST;
import static org.apache.bookkeeper.proto.RequestUtils.hasFlag;
import com.google.common.annotations.VisibleForTesting;
@@ -68,7 +40,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import lombok.AccessLevel;
@@ -80,9 +51,6 @@ import org.apache.bookkeeper.common.util.MathUtils;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.processor.RequestProcessor;
-import org.apache.bookkeeper.stats.Counter;
-import org.apache.bookkeeper.stats.Gauge;
-import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.tls.SecurityException;
import org.apache.bookkeeper.tls.SecurityHandlerFactory;
@@ -146,37 +114,8 @@ public class BookieRequestProcessor implements RequestProcessor {
// Expose Stats
private final BKStats bkStats = BKStats.getInstance();
private final boolean statsEnabled;
- private final OpStatsLogger addRequestStats;
- private final OpStatsLogger addEntryStats;
- final OpStatsLogger readRequestStats;
- final OpStatsLogger readEntryStats;
- final OpStatsLogger forceLedgerStats;
- final OpStatsLogger forceLedgerRequestStats;
- final OpStatsLogger fenceReadRequestStats;
- final OpStatsLogger fenceReadEntryStats;
- final OpStatsLogger fenceReadWaitStats;
- final OpStatsLogger readEntrySchedulingDelayStats;
- final OpStatsLogger longPollPreWaitStats;
- final OpStatsLogger longPollWaitStats;
- final OpStatsLogger longPollReadStats;
- final OpStatsLogger longPollReadRequestStats;
- final Counter readLastEntryNoEntryErrorCounter;
- final OpStatsLogger writeLacRequestStats;
- final OpStatsLogger writeLacStats;
- final OpStatsLogger readLacRequestStats;
- final OpStatsLogger readLacStats;
- final OpStatsLogger getBookieInfoRequestStats;
- final OpStatsLogger getBookieInfoStats;
- final OpStatsLogger channelWriteStats;
- final OpStatsLogger addEntryBlockedStats;
- final OpStatsLogger readEntryBlockedStats;
-
- final AtomicInteger addsInProgress = new AtomicInteger(0);
- final AtomicInteger maxAddsInProgress = new AtomicInteger(0);
- final AtomicInteger addsBlocked = new AtomicInteger(0);
- final AtomicInteger readsInProgress = new AtomicInteger(0);
- final AtomicInteger readsBlocked = new AtomicInteger(0);
- final AtomicInteger maxReadsInProgress = new AtomicInteger(0);
+
+ private final RequestStats requestStats;
final Semaphore addsSemaphore;
final Semaphore readsSemaphore;
@@ -248,86 +187,13 @@ public class BookieRequestProcessor implements RequestProcessor {
// Expose Stats
this.statsEnabled = serverCfg.isStatisticsEnabled();
- this.addEntryStats = statsLogger.getOpStatsLogger(ADD_ENTRY);
- this.addRequestStats = statsLogger.getOpStatsLogger(ADD_ENTRY_REQUEST);
- this.readEntryStats = statsLogger.getOpStatsLogger(READ_ENTRY);
- this.forceLedgerStats = statsLogger.getOpStatsLogger(FORCE_LEDGER);
- this.forceLedgerRequestStats = statsLogger.getOpStatsLogger(FORCE_LEDGER_REQUEST);
- this.readRequestStats = statsLogger.getOpStatsLogger(READ_ENTRY_REQUEST);
- this.fenceReadEntryStats = statsLogger.getOpStatsLogger(READ_ENTRY_FENCE_READ);
- this.fenceReadRequestStats = statsLogger.getOpStatsLogger(READ_ENTRY_FENCE_REQUEST);
- this.fenceReadWaitStats = statsLogger.getOpStatsLogger(READ_ENTRY_FENCE_WAIT);
- this.readEntrySchedulingDelayStats = statsLogger.getOpStatsLogger(READ_ENTRY_SCHEDULING_DELAY);
- this.longPollPreWaitStats = statsLogger.getOpStatsLogger(READ_ENTRY_LONG_POLL_PRE_WAIT);
- this.longPollWaitStats = statsLogger.getOpStatsLogger(READ_ENTRY_LONG_POLL_WAIT);
- this.longPollReadStats = statsLogger.getOpStatsLogger(READ_ENTRY_LONG_POLL_READ);
- this.longPollReadRequestStats = statsLogger.getOpStatsLogger(READ_ENTRY_LONG_POLL_REQUEST);
- this.readLastEntryNoEntryErrorCounter = statsLogger.getCounter(READ_LAST_ENTRY_NOENTRY_ERROR);
- this.writeLacStats = statsLogger.getOpStatsLogger(WRITE_LAC);
- this.writeLacRequestStats = statsLogger.getOpStatsLogger(WRITE_LAC_REQUEST);
- this.readLacStats = statsLogger.getOpStatsLogger(READ_LAC);
- this.readLacRequestStats = statsLogger.getOpStatsLogger(READ_LAC_REQUEST);
- this.getBookieInfoStats = statsLogger.getOpStatsLogger(GET_BOOKIE_INFO);
- this.getBookieInfoRequestStats = statsLogger.getOpStatsLogger(GET_BOOKIE_INFO_REQUEST);
- this.channelWriteStats = statsLogger.getOpStatsLogger(CHANNEL_WRITE);
-
- this.addEntryBlockedStats = statsLogger.getOpStatsLogger(ADD_ENTRY_BLOCKED_WAIT);
- this.readEntryBlockedStats = statsLogger.getOpStatsLogger(READ_ENTRY_BLOCKED_WAIT);
+ this.requestStats = new RequestStats(statsLogger);
int maxAdds = serverCfg.getMaxAddsInProgressLimit();
addsSemaphore = maxAdds > 0 ? new Semaphore(maxAdds, true) : null;
int maxReads = serverCfg.getMaxReadsInProgressLimit();
readsSemaphore = maxReads > 0 ? new Semaphore(maxReads, true) : null;
-
- statsLogger.registerGauge(ADD_ENTRY_IN_PROGRESS, new Gauge<Number>() {
- @Override
- public Number getDefaultValue() {
- return 0;
- }
-
- @Override
- public Number getSample() {
- return addsInProgress;
- }
- });
-
- statsLogger.registerGauge(ADD_ENTRY_BLOCKED, new Gauge<Number>() {
- @Override
- public Number getDefaultValue() {
- return 0;
- }
-
- @Override
- public Number getSample() {
- return addsBlocked;
- }
- });
-
- statsLogger.registerGauge(READ_ENTRY_IN_PROGRESS, new Gauge<Number>() {
- @Override
- public Number getDefaultValue() {
- return 0;
- }
-
- @Override
- public Number getSample() {
- return readsInProgress;
- }
- });
-
- statsLogger.registerGauge(READ_ENTRY_BLOCKED, new Gauge<Number>() {
- @Override
- public Number getDefaultValue() {
- return 0;
- }
-
- @Override
- public Number getSample() {
- return readsBlocked;
- }
- });
-
}
protected void onAddRequestStart(Channel channel) {
@@ -336,21 +202,19 @@ public class BookieRequestProcessor implements RequestProcessor {
final long throttlingStartTimeNanos = MathUtils.nowInNano();
channel.config().setAutoRead(false);
LOG.info("Too many add requests in progress, disabling autoread on channel {}", channel);
- addsBlocked.incrementAndGet();
+ requestStats.blockAddRequest();
addsSemaphore.acquireUninterruptibly();
channel.config().setAutoRead(true);
final long delayNanos = MathUtils.elapsedNanos(throttlingStartTimeNanos);
LOG.info("Re-enabled autoread on channel {} after AddRequest delay of {} nanos", channel, delayNanos);
- addEntryBlockedStats.registerSuccessfulEvent(delayNanos, TimeUnit.NANOSECONDS);
- addsBlocked.decrementAndGet();
+ requestStats.unblockAddRequest(delayNanos);
}
}
- final int curr = addsInProgress.incrementAndGet();
- maxAddsInProgress.accumulateAndGet(curr, Integer::max);
+ requestStats.trackAddRequest();
}
protected void onAddRequestFinish() {
- addsInProgress.decrementAndGet();
+ requestStats.untrackAddRequest();
if (addsSemaphore != null) {
addsSemaphore.release();
}
@@ -362,21 +226,19 @@ public class BookieRequestProcessor implements RequestProcessor {
final long throttlingStartTimeNanos = MathUtils.nowInNano();
channel.config().setAutoRead(false);
LOG.info("Too many read requests in progress, disabling autoread on channel {}", channel);
- readsBlocked.incrementAndGet();
+ requestStats.blockReadRequest();
readsSemaphore.acquireUninterruptibly();
channel.config().setAutoRead(true);
final long delayNanos = MathUtils.elapsedNanos(throttlingStartTimeNanos);
LOG.info("Re-enabled autoread on channel {} after ReadRequest delay of {} nanos", channel, delayNanos);
- readEntryBlockedStats.registerSuccessfulEvent(delayNanos, TimeUnit.NANOSECONDS);
- readsBlocked.decrementAndGet();
+ requestStats.unblockReadRequest(delayNanos);
}
}
- final int curr = readsInProgress.incrementAndGet();
- maxReadsInProgress.accumulateAndGet(curr, Integer::max);
+ requestStats.trackReadRequest();
}
protected void onReadRequestFinish() {
- readsInProgress.decrementAndGet();
+ requestStats.untrackReadRequest();
if (readsSemaphore != null) {
readsSemaphore.release();
}
@@ -384,12 +246,12 @@ public class BookieRequestProcessor implements RequestProcessor {
@VisibleForTesting
int maxAddsInProgressCount() {
- return maxAddsInProgress.get();
+ return requestStats.maxAddsInProgressCount();
}
@VisibleForTesting
int maxReadsInProgressCount() {
- return maxReadsInProgress.get();
+ return requestStats.maxReadsInProgressCount();
}
@Override
@@ -576,7 +438,7 @@ public class BookieRequestProcessor implements RequestProcessor {
.setStatus(addResponse.getStatus())
.setAddResponse(addResponse);
BookkeeperProtocol.Response resp = response.build();
- write.sendResponse(addResponse.getStatus(), resp, addRequestStats);
+ write.sendResponse(addResponse.getStatus(), resp, requestStats.getAddRequestStats());
}
}
}
@@ -610,7 +472,10 @@ public class BookieRequestProcessor implements RequestProcessor {
.setStatus(forceLedgerResponse.getStatus())
.setForceLedgerResponse(forceLedgerResponse);
BookkeeperProtocol.Response resp = response.build();
- forceLedger.sendResponse(forceLedgerResponse.getStatus(), resp, forceLedgerRequestStats);
+ forceLedger.sendResponse(
+ forceLedgerResponse.getStatus(),
+ resp,
+ requestStats.getForceLedgerRequestStats());
}
}
}
@@ -660,7 +525,7 @@ public class BookieRequestProcessor implements RequestProcessor {
.setStatus(readResponse.getStatus())
.setReadResponse(readResponse);
BookkeeperProtocol.Response resp = response.build();
- read.sendResponse(readResponse.getStatus(), resp, readRequestStats);
+ read.sendResponse(readResponse.getStatus(), resp, requestStats.getReadRequestStats());
}
}
}
@@ -740,8 +605,10 @@ public class BookieRequestProcessor implements RequestProcessor {
r.entryId);
}
- write.sendResponse(BookieProtocol.ETOOMANYREQUESTS,
- ResponseBuilder.buildErrorResponse(BookieProtocol.ETOOMANYREQUESTS, r), addRequestStats);
+ write.sendResponse(
+ BookieProtocol.ETOOMANYREQUESTS,
+ ResponseBuilder.buildErrorResponse(BookieProtocol.ETOOMANYREQUESTS, r),
+ requestStats.getAddRequestStats());
}
}
}
@@ -770,8 +637,10 @@ public class BookieRequestProcessor implements RequestProcessor {
r.entryId);
}
- read.sendResponse(BookieProtocol.ETOOMANYREQUESTS,
- ResponseBuilder.buildErrorResponse(BookieProtocol.ETOOMANYREQUESTS, r), readRequestStats);
+ read.sendResponse(
+ BookieProtocol.ETOOMANYREQUESTS,
+ ResponseBuilder.buildErrorResponse(BookieProtocol.ETOOMANYREQUESTS, r),
+ requestStats.getReadRequestStats());
}
}
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3.java
index 0c8ef01..f889172 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3.java
@@ -67,11 +67,11 @@ class ForceLedgerProcessorV3 extends PacketProcessorBaseV3 implements Runnable {
"ledgerId must be {} but was {}", ledgerId, ledgerId1);
if (BookieProtocol.EOK == rc) {
- requestProcessor.getForceLedgerStats()
+ requestProcessor.getRequestStats().getForceLedgerStats()
.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos),
TimeUnit.NANOSECONDS);
} else {
- requestProcessor.getForceLedgerStats()
+ requestProcessor.getRequestStats().getForceLedgerStats()
.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos),
TimeUnit.NANOSECONDS);
}
@@ -94,7 +94,7 @@ class ForceLedgerProcessorV3 extends PacketProcessorBaseV3 implements Runnable {
.setStatus(forceLedgerResponse.getStatus())
.setForceLedgerResponse(forceLedgerResponse);
Response resp = response.build();
- sendResponse(status, resp, requestProcessor.getForceLedgerRequestStats());
+ sendResponse(status, resp, requestProcessor.getRequestStats().getForceLedgerRequestStats());
};
StatusCode status = null;
try {
@@ -124,7 +124,10 @@ class ForceLedgerProcessorV3 extends PacketProcessorBaseV3 implements Runnable {
.setStatus(forceLedgerResponse.getStatus())
.setForceLedgerResponse(forceLedgerResponse);
Response resp = response.build();
- sendResponse(forceLedgerResponse.getStatus(), resp, requestProcessor.getForceLedgerRequestStats());
+ sendResponse(
+ forceLedgerResponse.getStatus(),
+ resp,
+ requestProcessor.getRequestStats().getForceLedgerRequestStats());
}
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java
index d964957..fe315f6 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java
@@ -54,8 +54,8 @@ public class GetBookieInfoProcessorV3 extends PacketProcessorBaseV3 implements R
if (!isVersionCompatible()) {
getBookieInfoResponse.setStatus(StatusCode.EBADVERSION);
- requestProcessor.getGetBookieInfoStats().registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos),
- TimeUnit.NANOSECONDS);
+ requestProcessor.getRequestStats().getGetBookieInfoStats()
+ .registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
return getBookieInfoResponse.build();
}
@@ -80,8 +80,8 @@ public class GetBookieInfoProcessorV3 extends PacketProcessorBaseV3 implements R
}
getBookieInfoResponse.setStatus(status);
- requestProcessor.getGetBookieInfoStats().registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos),
- TimeUnit.NANOSECONDS);
+ requestProcessor.getRequestStats().getGetBookieInfoStats()
+ .registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
return getBookieInfoResponse.build();
}
@@ -98,6 +98,6 @@ public class GetBookieInfoProcessorV3 extends PacketProcessorBaseV3 implements R
.setGetBookieInfoResponse(getBookieInfoResponse);
sendResponse(response.getStatus(),
response.build(),
- requestProcessor.getGetBookieInfoRequestStats());
+ requestProcessor.getRequestStats().getGetBookieInfoRequestStats());
}
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java
index fdbbd35..6f25d68 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java
@@ -101,7 +101,7 @@ class LongPollReadEntryProcessorV3 extends ReadEntryProcessorV3 implements Watch
try {
return super.readEntry(readResponseBuilder, entryId, true, startTimeSw);
} catch (Bookie.NoEntryException e) {
- requestProcessor.readLastEntryNoEntryErrorCounter.inc();
+ requestProcessor.getRequestStats().getReadLastEntryNoEntryErrorCounter().inc();
logger.info(
"No entry found while piggyback reading entry {} from ledger {} : previous lac = {}",
entryId, ledgerId, previousLAC);
@@ -153,7 +153,7 @@ class LongPollReadEntryProcessorV3 extends ReadEntryProcessorV3 implements Watch
return buildErrorResponse(StatusCode.EIO, startTimeSw);
}
- registerSuccessfulEvent(requestProcessor.longPollPreWaitStats, startTimeSw);
+ registerSuccessfulEvent(requestProcessor.getRequestStats().getLongPollPreWaitStats(), startTimeSw);
lastPhaseStartTime.reset().start();
if (watched) {
@@ -213,7 +213,7 @@ class LongPollReadEntryProcessorV3 extends ReadEntryProcessorV3 implements Watch
expirationTimerTask.cancel();
}
- registerEvent(timeout, requestProcessor.longPollWaitStats, lastPhaseStartTime);
+ registerEvent(timeout, requestProcessor.getRequestStats().getLongPollWaitStats(), lastPhaseStartTime);
lastPhaseStartTime.reset().start();
}
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
index b7dee2d..54368a0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
@@ -79,7 +79,7 @@ abstract class PacketProcessorBase<T extends Request> extends SafeRunnable {
if (!isVersionCompatible()) {
sendResponse(BookieProtocol.EBADVERSION,
ResponseBuilder.buildErrorResponse(BookieProtocol.EBADVERSION, request),
- requestProcessor.readRequestStats);
+ requestProcessor.getRequestStats().getReadRequestStats());
return;
}
processPacket();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java
index 7dc29a3..15765a2 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java
@@ -79,7 +79,7 @@ public abstract class PacketProcessorBaseV3 extends SafeRunnable {
if (!channel.isWritable()) {
LOGGER.warn("cannot write response to non-writable channel {} for request {}", channel,
StringUtils.requestToString(request));
- requestProcessor.getChannelWriteStats()
+ requestProcessor.getRequestStats().getChannelWriteStats()
.registerFailedEvent(MathUtils.elapsedNanos(writeNanos), TimeUnit.NANOSECONDS);
statsLogger.registerFailedEvent(MathUtils.elapsedNanos(enqueueNanos), TimeUnit.NANOSECONDS);
return;
@@ -93,10 +93,10 @@ public abstract class PacketProcessorBaseV3 extends SafeRunnable {
public void operationComplete(ChannelFuture future) throws Exception {
long writeElapsedNanos = MathUtils.elapsedNanos(writeNanos);
if (!future.isSuccess()) {
- requestProcessor.getChannelWriteStats()
+ requestProcessor.getRequestStats().getChannelWriteStats()
.registerFailedEvent(writeElapsedNanos, TimeUnit.NANOSECONDS);
} else {
- requestProcessor.getChannelWriteStats()
+ requestProcessor.getRequestStats().getChannelWriteStats()
.registerSuccessfulEvent(writeElapsedNanos, TimeUnit.NANOSECONDS);
}
if (StatusCode.EOK == code) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
index edeb8a6..6566c7b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
@@ -20,7 +20,6 @@ package org.apache.bookkeeper.proto;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.util.Recycler;
-import io.netty.util.Recycler.Handle;
import io.netty.util.ReferenceCountUtil;
import java.io.IOException;
@@ -129,17 +128,17 @@ class ReadEntryProcessor extends PacketProcessorBase<ReadRequest> {
LOG.trace("Read entry rc = {} for {}", errorCode, request);
}
if (errorCode == BookieProtocol.EOK) {
- requestProcessor.readEntryStats.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos),
- TimeUnit.NANOSECONDS);
+ requestProcessor.getRequestStats().getReadEntryStats()
+ .registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
sendResponse(errorCode, ResponseBuilder.buildReadResponse(data, request),
- requestProcessor.readRequestStats);
+ requestProcessor.getRequestStats().getReadRequestStats());
} else {
ReferenceCountUtil.release(data);
- requestProcessor.readEntryStats.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos),
- TimeUnit.NANOSECONDS);
+ requestProcessor.getRequestStats().getReadEntryStats()
+ .registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
sendResponse(errorCode, ResponseBuilder.buildErrorResponse(errorCode, request),
- requestProcessor.readRequestStats);
+ requestProcessor.getRequestStats().getReadRequestStats());
}
recycle();
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
index e7e5653..88b7662 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
@@ -71,14 +71,14 @@ class ReadEntryProcessorV3 extends PacketProcessorBaseV3 {
this.ledgerId = readRequest.getLedgerId();
this.entryId = readRequest.getEntryId();
if (RequestUtils.isFenceRequest(this.readRequest)) {
- this.readStats = requestProcessor.fenceReadEntryStats;
- this.reqStats = requestProcessor.fenceReadRequestStats;
+ this.readStats = requestProcessor.getRequestStats().getFenceReadEntryStats();
+ this.reqStats = requestProcessor.getRequestStats().getFenceReadRequestStats();
} else if (readRequest.hasPreviousLAC()) {
- this.readStats = requestProcessor.longPollReadStats;
- this.reqStats = requestProcessor.longPollReadRequestStats;
+ this.readStats = requestProcessor.getRequestStats().getLongPollReadStats();
+ this.reqStats = requestProcessor.getRequestStats().getLongPollReadRequestStats();
} else {
- this.readStats = requestProcessor.readEntryStats;
- this.reqStats = requestProcessor.readRequestStats;
+ this.readStats = requestProcessor.getRequestStats().getReadEntryStats();
+ this.reqStats = requestProcessor.getRequestStats().getReadRequestStats();
}
this.fenceThreadPool = fenceThreadPool;
@@ -246,7 +246,7 @@ class ReadEntryProcessorV3 extends PacketProcessorBaseV3 {
@Override
public void safeRun() {
- requestProcessor.readEntrySchedulingDelayStats.registerSuccessfulEvent(
+ requestProcessor.getRequestStats().getReadEntrySchedulingDelayStats().registerSuccessfulEvent(
MathUtils.elapsedNanos(enqueueNanos), TimeUnit.NANOSECONDS);
if (!isVersionCompatible()) {
@@ -275,11 +275,11 @@ class ReadEntryProcessorV3 extends PacketProcessorBaseV3 {
StatusCode status;
if (!fenceResult) {
status = StatusCode.EIO;
- registerFailedEvent(requestProcessor.fenceReadWaitStats, lastPhaseStartTime);
+ registerFailedEvent(requestProcessor.getRequestStats().getFenceReadWaitStats(), lastPhaseStartTime);
} else {
status = StatusCode.EOK;
readResponse.setBody(ByteString.copyFrom(entryBody.nioBuffer()));
- registerSuccessfulEvent(requestProcessor.fenceReadWaitStats, lastPhaseStartTime);
+ registerSuccessfulEvent(requestProcessor.getRequestStats().getFenceReadWaitStats(), lastPhaseStartTime);
}
if (null != entryBody) {
@@ -296,7 +296,7 @@ class ReadEntryProcessorV3 extends PacketProcessorBaseV3 {
// build the fence read response
getFenceResponse(readResponse, entryBody, fenceResult);
// register fence read stat
- registerEvent(!fenceResult, requestProcessor.fenceReadEntryStats, startTimeSw);
+ registerEvent(!fenceResult, requestProcessor.getRequestStats().getFenceReadEntryStats(), startTimeSw);
// send the fence read response
sendResponse(readResponse.build());
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java
index 898ddb0..a3bc311 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java
@@ -102,11 +102,11 @@ class ReadLacProcessorV3 extends PacketProcessorBaseV3 implements Runnable {
}
if (status == StatusCode.EOK) {
- requestProcessor.readLacStats.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos),
- TimeUnit.NANOSECONDS);
+ requestProcessor.getRequestStats().getReadLacStats()
+ .registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
} else {
- requestProcessor.readLacStats.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos),
- TimeUnit.NANOSECONDS);
+ requestProcessor.getRequestStats().getReadLacStats()
+ .registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
}
// Finally set the status and return
readLacResponse.setStatus(status);
@@ -126,6 +126,6 @@ class ReadLacProcessorV3 extends PacketProcessorBaseV3 implements Runnable {
.setReadLacResponse(readLacResponse);
sendResponse(response.getStatus(),
response.build(),
- requestProcessor.readLacRequestStats);
+ requestProcessor.getRequestStats().getReadLacRequestStats());
}
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/RequestStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/RequestStats.java
new file mode 100644
index 0000000..1799e66
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/RequestStats.java
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.proto;
+
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY_BLOCKED;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY_BLOCKED_WAIT;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY_IN_PROGRESS;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY_REQUEST;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.CATEGORY_SERVER;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.CHANNEL_WRITE;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.FORCE_LEDGER;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.FORCE_LEDGER_REQUEST;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.GET_BOOKIE_INFO;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.GET_BOOKIE_INFO_REQUEST;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_BLOCKED;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_BLOCKED_WAIT;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_FENCE_READ;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_FENCE_REQUEST;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_FENCE_WAIT;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_IN_PROGRESS;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_LONG_POLL_PRE_WAIT;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_LONG_POLL_READ;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_LONG_POLL_REQUEST;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_LONG_POLL_WAIT;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_REQUEST;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_SCHEDULING_DELAY;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_LAC;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_LAC_REQUEST;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_LAST_ENTRY_NOENTRY_ERROR;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.SERVER_SCOPE;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.WRITE_LAC;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.WRITE_LAC_REQUEST;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.Getter;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.Gauge;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stats.annotations.StatsDoc;
+
+/**
+ * A umbrella class for request related stats.
+ */
+@StatsDoc(
+ name = SERVER_SCOPE,
+ category = CATEGORY_SERVER,
+ help = "Bookie request stats"
+)
+@Getter
+public class RequestStats {
+
+ final AtomicInteger addsInProgress = new AtomicInteger(0);
+ final AtomicInteger maxAddsInProgress = new AtomicInteger(0);
+ final AtomicInteger addsBlocked = new AtomicInteger(0);
+ final AtomicInteger readsInProgress = new AtomicInteger(0);
+ final AtomicInteger readsBlocked = new AtomicInteger(0);
+ final AtomicInteger maxReadsInProgress = new AtomicInteger(0);
+
+ @StatsDoc(
+ name = ADD_ENTRY_REQUEST,
+ help = "request stats of AddEntry on a bookie"
+ )
+ private final OpStatsLogger addRequestStats;
+ @StatsDoc(
+ name = ADD_ENTRY,
+ help = "operation stats of AddEntry on a bookie",
+ parent = ADD_ENTRY_REQUEST
+ )
+ private final OpStatsLogger addEntryStats;
+ @StatsDoc(
+ name = READ_ENTRY_REQUEST,
+ help = "request stats of ReadEntry on a bookie"
+ )
+ final OpStatsLogger readRequestStats;
+ @StatsDoc(
+ name = READ_ENTRY,
+ help = "operation stats of ReadEntry on a bookie",
+ parent = READ_ENTRY_REQUEST
+ )
+ final OpStatsLogger readEntryStats;
+ @StatsDoc(
+ name = FORCE_LEDGER,
+ help = "operation stats of ForceLedger on a bookie",
+ parent = FORCE_LEDGER_REQUEST
+ )
+ final OpStatsLogger forceLedgerStats;
+ @StatsDoc(
+ name = FORCE_LEDGER_REQUEST,
+ help = "request stats of ForceLedger on a bookie"
+ )
+ final OpStatsLogger forceLedgerRequestStats;
+ @StatsDoc(
+ name = READ_ENTRY_FENCE_REQUEST,
+ help = "request stats of FenceRead on a bookie"
+ )
+ final OpStatsLogger fenceReadRequestStats;
+ @StatsDoc(
+ name = READ_ENTRY_FENCE_READ,
+ help = "operation stats of FenceRead on a bookie",
+ parent = READ_ENTRY_FENCE_REQUEST,
+ happensAfter = READ_ENTRY_FENCE_WAIT
+ )
+ final OpStatsLogger fenceReadEntryStats;
+ @StatsDoc(
+ name = READ_ENTRY_FENCE_WAIT,
+ help = "operation stats of FenceReadWait on a bookie",
+ parent = READ_ENTRY_FENCE_REQUEST
+ )
+ final OpStatsLogger fenceReadWaitStats;
+ @StatsDoc(
+ name = READ_ENTRY_SCHEDULING_DELAY,
+ help = "operation stats of ReadEntry scheduling delays on a bookie"
+ )
+ final OpStatsLogger readEntrySchedulingDelayStats;
+ @StatsDoc(
+ name = READ_ENTRY_LONG_POLL_PRE_WAIT,
+ help = "operation stats of LongPoll Reads pre wait time on a bookie",
+ parent = READ_ENTRY_LONG_POLL_REQUEST
+ )
+ final OpStatsLogger longPollPreWaitStats;
+ @StatsDoc(
+ name = READ_ENTRY_LONG_POLL_WAIT,
+ help = "operation stats of LongPoll Reads wait time on a bookie",
+ happensAfter = READ_ENTRY_LONG_POLL_PRE_WAIT,
+ parent = READ_ENTRY_LONG_POLL_REQUEST
+ )
+ final OpStatsLogger longPollWaitStats;
+ @StatsDoc(
+ name = READ_ENTRY_LONG_POLL_READ,
+ help = "operation stats of LongPoll Reads on a bookie",
+ happensAfter = READ_ENTRY_LONG_POLL_WAIT,
+ parent = READ_ENTRY_LONG_POLL_REQUEST
+ )
+ final OpStatsLogger longPollReadStats;
+ @StatsDoc(
+ name = READ_ENTRY_LONG_POLL_REQUEST,
+ help = "request stats of LongPoll Reads on a bookie"
+ )
+ final OpStatsLogger longPollReadRequestStats;
+ @StatsDoc(
+ name = READ_LAST_ENTRY_NOENTRY_ERROR,
+ help = "total NOENTRY errors of reading last entry on a bookie"
+ )
+ final Counter readLastEntryNoEntryErrorCounter;
+ @StatsDoc(
+ name = WRITE_LAC_REQUEST,
+ help = "request stats of WriteLac on a bookie"
+ )
+ final OpStatsLogger writeLacRequestStats;
+ @StatsDoc(
+ name = WRITE_LAC,
+ help = "operation stats of WriteLac on a bookie",
+ parent = WRITE_LAC_REQUEST
+ )
+ final OpStatsLogger writeLacStats;
+ @StatsDoc(
+ name = READ_LAC_REQUEST,
+ help = "request stats of ReadLac on a bookie"
+ )
+ final OpStatsLogger readLacRequestStats;
+ @StatsDoc(
+ name = READ_LAC,
+ help = "operation stats of ReadLac on a bookie",
+ parent = READ_LAC_REQUEST
+ )
+ final OpStatsLogger readLacStats;
+ @StatsDoc(
+ name = GET_BOOKIE_INFO_REQUEST,
+ help = "request stats of GetBookieInfo on a bookie"
+ )
+ final OpStatsLogger getBookieInfoRequestStats;
+ @StatsDoc(
+ name = GET_BOOKIE_INFO,
+ help = "operation stats of GetBookieInfo on a bookie"
+ )
+ final OpStatsLogger getBookieInfoStats;
+ @StatsDoc(
+ name = CHANNEL_WRITE,
+ help = "channel write stats on a bookie"
+ )
+ final OpStatsLogger channelWriteStats;
+ @StatsDoc(
+ name = ADD_ENTRY_BLOCKED,
+ help = "operation stats of AddEntry blocked on a bookie"
+ )
+ final OpStatsLogger addEntryBlockedStats;
+ @StatsDoc(
+ name = READ_ENTRY_BLOCKED,
+ help = "operation stats of ReadEntry blocked on a bookie"
+ )
+ final OpStatsLogger readEntryBlockedStats;
+
+ public RequestStats(StatsLogger statsLogger) {
+ this.addEntryStats = statsLogger.getOpStatsLogger(ADD_ENTRY);
+ this.addRequestStats = statsLogger.getOpStatsLogger(ADD_ENTRY_REQUEST);
+ this.readEntryStats = statsLogger.getOpStatsLogger(READ_ENTRY);
+ this.forceLedgerStats = statsLogger.getOpStatsLogger(FORCE_LEDGER);
+ this.forceLedgerRequestStats = statsLogger.getOpStatsLogger(FORCE_LEDGER_REQUEST);
+ this.readRequestStats = statsLogger.getOpStatsLogger(READ_ENTRY_REQUEST);
+ this.fenceReadEntryStats = statsLogger.getOpStatsLogger(READ_ENTRY_FENCE_READ);
+ this.fenceReadRequestStats = statsLogger.getOpStatsLogger(READ_ENTRY_FENCE_REQUEST);
+ this.fenceReadWaitStats = statsLogger.getOpStatsLogger(READ_ENTRY_FENCE_WAIT);
+ this.readEntrySchedulingDelayStats = statsLogger.getOpStatsLogger(READ_ENTRY_SCHEDULING_DELAY);
+ this.longPollPreWaitStats = statsLogger.getOpStatsLogger(READ_ENTRY_LONG_POLL_PRE_WAIT);
+ this.longPollWaitStats = statsLogger.getOpStatsLogger(READ_ENTRY_LONG_POLL_WAIT);
+ this.longPollReadStats = statsLogger.getOpStatsLogger(READ_ENTRY_LONG_POLL_READ);
+ this.longPollReadRequestStats = statsLogger.getOpStatsLogger(READ_ENTRY_LONG_POLL_REQUEST);
+ this.readLastEntryNoEntryErrorCounter = statsLogger.getCounter(READ_LAST_ENTRY_NOENTRY_ERROR);
+ this.writeLacStats = statsLogger.getOpStatsLogger(WRITE_LAC);
+ this.writeLacRequestStats = statsLogger.getOpStatsLogger(WRITE_LAC_REQUEST);
+ this.readLacStats = statsLogger.getOpStatsLogger(READ_LAC);
+ this.readLacRequestStats = statsLogger.getOpStatsLogger(READ_LAC_REQUEST);
+ this.getBookieInfoStats = statsLogger.getOpStatsLogger(GET_BOOKIE_INFO);
+ this.getBookieInfoRequestStats = statsLogger.getOpStatsLogger(GET_BOOKIE_INFO_REQUEST);
+ this.channelWriteStats = statsLogger.getOpStatsLogger(CHANNEL_WRITE);
+
+ this.addEntryBlockedStats = statsLogger.getOpStatsLogger(ADD_ENTRY_BLOCKED_WAIT);
+ this.readEntryBlockedStats = statsLogger.getOpStatsLogger(READ_ENTRY_BLOCKED_WAIT);
+
+ statsLogger.registerGauge(ADD_ENTRY_IN_PROGRESS, new Gauge<Number>() {
+ @Override
+ public Number getDefaultValue() {
+ return 0;
+ }
+
+ @Override
+ public Number getSample() {
+ return addsInProgress;
+ }
+ });
+
+ statsLogger.registerGauge(ADD_ENTRY_BLOCKED, new Gauge<Number>() {
+ @Override
+ public Number getDefaultValue() {
+ return 0;
+ }
+
+ @Override
+ public Number getSample() {
+ return addsBlocked;
+ }
+ });
+
+ statsLogger.registerGauge(READ_ENTRY_IN_PROGRESS, new Gauge<Number>() {
+ @Override
+ public Number getDefaultValue() {
+ return 0;
+ }
+
+ @Override
+ public Number getSample() {
+ return readsInProgress;
+ }
+ });
+
+ statsLogger.registerGauge(READ_ENTRY_BLOCKED, new Gauge<Number>() {
+ @Override
+ public Number getDefaultValue() {
+ return 0;
+ }
+
+ @Override
+ public Number getSample() {
+ return readsBlocked;
+ }
+ });
+ }
+
+ //
+ // Add requests
+ //
+
+ void blockAddRequest() {
+ addsBlocked.incrementAndGet();
+ }
+
+ void unblockAddRequest(long delayNanos) {
+ addEntryBlockedStats.registerSuccessfulEvent(delayNanos, TimeUnit.NANOSECONDS);
+ addsBlocked.decrementAndGet();
+ }
+
+ void trackAddRequest() {
+ final int curr = addsInProgress.incrementAndGet();
+ maxAddsInProgress.accumulateAndGet(curr, Integer::max);
+ }
+
+ void untrackAddRequest() {
+ addsInProgress.decrementAndGet();
+ }
+
+ int maxAddsInProgressCount() {
+ return maxAddsInProgress.get();
+ }
+
+ //
+ // Read requests
+ //
+
+ void blockReadRequest() {
+ readsBlocked.incrementAndGet();
+ }
+
+ void unblockReadRequest(long delayNanos) {
+ readEntryBlockedStats.registerSuccessfulEvent(delayNanos, TimeUnit.NANOSECONDS);
+ readsBlocked.decrementAndGet();
+ }
+
+ void trackReadRequest() {
+ final int curr = readsInProgress.incrementAndGet();
+ maxReadsInProgress.accumulateAndGet(curr, Integer::max);
+ }
+
+ void untrackReadRequest() {
+ readsInProgress.decrementAndGet();
+ }
+
+ int maxReadsInProgressCount() {
+ return maxReadsInProgress.get();
+ }
+
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
index f5af75a..70db7ce 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
@@ -63,7 +63,7 @@ class WriteEntryProcessor extends PacketProcessorBase<ParsedAddRequest> implemen
+ " so rejecting the request from the client!");
sendResponse(BookieProtocol.EREADONLY,
ResponseBuilder.buildErrorResponse(BookieProtocol.EREADONLY, request),
- requestProcessor.getAddRequestStats());
+ requestProcessor.getRequestStats().getAddRequestStats());
request.release();
request.recycle();
return;
@@ -104,11 +104,11 @@ class WriteEntryProcessor extends PacketProcessorBase<ParsedAddRequest> implemen
}
if (rc != BookieProtocol.EOK) {
- requestProcessor.getAddEntryStats().registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos),
- TimeUnit.NANOSECONDS);
+ requestProcessor.getRequestStats().getAddEntryStats()
+ .registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
sendResponse(rc,
ResponseBuilder.buildErrorResponse(rc, request),
- requestProcessor.getAddRequestStats());
+ requestProcessor.getRequestStats().getAddRequestStats());
request.recycle();
}
}
@@ -117,15 +117,15 @@ class WriteEntryProcessor extends PacketProcessorBase<ParsedAddRequest> implemen
public void writeComplete(int rc, long ledgerId, long entryId,
BookieSocketAddress addr, Object ctx) {
if (BookieProtocol.EOK == rc) {
- requestProcessor.getAddEntryStats().registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos),
- TimeUnit.NANOSECONDS);
+ requestProcessor.getRequestStats().getAddEntryStats()
+ .registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
} else {
- requestProcessor.getAddEntryStats().registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos),
- TimeUnit.NANOSECONDS);
+ requestProcessor.getRequestStats().getAddEntryStats()
+ .registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
}
sendResponse(rc,
ResponseBuilder.buildAddResponse(request),
- requestProcessor.getAddRequestStats());
+ requestProcessor.getRequestStats().getAddRequestStats());
request.recycle();
recycle();
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
index 7747e5c..c8ea067 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
@@ -80,11 +80,11 @@ class WriteEntryProcessorV3 extends PacketProcessorBaseV3 {
public void writeComplete(int rc, long ledgerId, long entryId,
BookieSocketAddress addr, Object ctx) {
if (BookieProtocol.EOK == rc) {
- requestProcessor.getAddEntryStats().registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos),
- TimeUnit.NANOSECONDS);
+ requestProcessor.getRequestStats().getAddEntryStats()
+ .registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
} else {
- requestProcessor.getAddEntryStats().registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos),
- TimeUnit.NANOSECONDS);
+ requestProcessor.getRequestStats().getAddEntryStats()
+ .registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
}
StatusCode status;
@@ -105,7 +105,7 @@ class WriteEntryProcessorV3 extends PacketProcessorBaseV3 {
.setStatus(addResponse.getStatus())
.setAddResponse(addResponse);
Response resp = response.build();
- sendResponse(status, resp, requestProcessor.getAddRequestStats());
+ sendResponse(status, resp, requestProcessor.getRequestStats().getAddRequestStats());
}
};
final EnumSet<WriteFlag> writeFlags;
@@ -171,7 +171,7 @@ class WriteEntryProcessorV3 extends PacketProcessorBaseV3 {
.setAddResponse(addResponse);
Response resp = response.build();
sendResponse(addResponse.getStatus(), resp,
- requestProcessor.getAddRequestStats());
+ requestProcessor.getRequestStats().getAddRequestStats());
}
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java
index 7e42a73..691102b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java
@@ -71,11 +71,11 @@ class WriteLacProcessorV3 extends PacketProcessorBaseV3 implements Runnable {
@Override
public void writeComplete(int rc, long ledgerId, long entryId, BookieSocketAddress addr, Object ctx) {
if (BookieProtocol.EOK == rc) {
- requestProcessor.writeLacStats.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos),
- TimeUnit.NANOSECONDS);
+ requestProcessor.getRequestStats().getWriteLacStats()
+ .registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
} else {
- requestProcessor.writeLacStats.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos),
- TimeUnit.NANOSECONDS);
+ requestProcessor.getRequestStats().getWriteLacStats()
+ .registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
}
StatusCode status;
@@ -96,7 +96,7 @@ class WriteLacProcessorV3 extends PacketProcessorBaseV3 implements Runnable {
.setStatus(writeLacResponse.getStatus())
.setWriteLacResponse(writeLacResponse);
Response resp = response.build();
- sendResponse(status, resp, requestProcessor.writeLacRequestStats);
+ sendResponse(status, resp, requestProcessor.getRequestStats().getWriteLacRequestStats());
}
};
@@ -130,8 +130,8 @@ class WriteLacProcessorV3 extends PacketProcessorBaseV3 implements Runnable {
// If everything is okay, we return null so that the calling function
// dosn't return a response back to the caller.
if (!status.equals(StatusCode.EOK)) {
- requestProcessor.writeLacStats.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos),
- TimeUnit.NANOSECONDS);
+ requestProcessor.getRequestStats().getWriteLacStats()
+ .registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
writeLacResponse.setStatus(status);
return writeLacResponse.build();
}
@@ -147,7 +147,10 @@ class WriteLacProcessorV3 extends PacketProcessorBaseV3 implements Runnable {
.setStatus(writeLacResponse.getStatus())
.setWriteLacResponse(writeLacResponse);
Response resp = response.build();
- sendResponse(writeLacResponse.getStatus(), resp, requestProcessor.writeLacRequestStats);
+ sendResponse(
+ writeLacResponse.getStatus(),
+ resp,
+ requestProcessor.getRequestStats().getWriteLacRequestStats());
}
}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java
index 37d4647..bab83fb 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java
@@ -75,10 +75,7 @@ public class ForceLedgerProcessorV3Test {
requestProcessor = mock(BookieRequestProcessor.class);
when(requestProcessor.getBookie()).thenReturn(bookie);
when(requestProcessor.getWaitTimeoutOnBackpressureMillis()).thenReturn(-1L);
- when(requestProcessor.getForceLedgerStats())
- .thenReturn(NullStatsLogger.INSTANCE.getOpStatsLogger("force_ledger"));
- when(requestProcessor.getForceLedgerRequestStats())
- .thenReturn(NullStatsLogger.INSTANCE.getOpStatsLogger("force_ledger_request"));
+ when(requestProcessor.getRequestStats()).thenReturn(new RequestStats(NullStatsLogger.INSTANCE));
processor = new ForceLedgerProcessorV3(
request,
channel,
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java
index 5901c2f..bbcffea 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java
@@ -67,10 +67,7 @@ public class WriteEntryProcessorTest {
bookie = mock(Bookie.class);
requestProcessor = mock(BookieRequestProcessor.class);
when(requestProcessor.getBookie()).thenReturn(bookie);
- when(requestProcessor.getAddEntryStats())
- .thenReturn(NullStatsLogger.INSTANCE.getOpStatsLogger("add_entry"));
- when(requestProcessor.getAddRequestStats())
- .thenReturn(NullStatsLogger.INSTANCE.getOpStatsLogger("add_requests"));
+ when(requestProcessor.getRequestStats()).thenReturn(new RequestStats(NullStatsLogger.INSTANCE));
processor = WriteEntryProcessor.create(
request,
channel,
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java
index df7b153..292dc51 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java
@@ -80,12 +80,7 @@ public class WriteEntryProcessorV3Test {
requestProcessor = mock(BookieRequestProcessor.class);
when(requestProcessor.getBookie()).thenReturn(bookie);
when(requestProcessor.getWaitTimeoutOnBackpressureMillis()).thenReturn(-1L);
- when(requestProcessor.getAddEntryStats())
- .thenReturn(NullStatsLogger.INSTANCE.getOpStatsLogger("add_entry"));
- when(requestProcessor.getAddRequestStats())
- .thenReturn(NullStatsLogger.INSTANCE.getOpStatsLogger("add_requests"));
- when(requestProcessor.getChannelWriteStats())
- .thenReturn(NullStatsLogger.INSTANCE.getOpStatsLogger("CHANNEL_WRITE"));
+ when(requestProcessor.getRequestStats()).thenReturn(new RequestStats(NullStatsLogger.INSTANCE));
processor = new WriteEntryProcessorV3(
request,
channel,
diff --git a/bookkeeper-stats/src/main/java/org/apache/bookkeeper/stats/annotations/StatsDoc.java b/bookkeeper-stats/src/main/java/org/apache/bookkeeper/stats/annotations/StatsDoc.java
index 97f487a..d2ca8c6 100644
--- a/bookkeeper-stats/src/main/java/org/apache/bookkeeper/stats/annotations/StatsDoc.java
+++ b/bookkeeper-stats/src/main/java/org/apache/bookkeeper/stats/annotations/StatsDoc.java
@@ -58,5 +58,25 @@ public @interface StatsDoc {
*/
String help();
+ /**
+ * The parent metric name.
+ *
+ * <p>It can used for analyzing the relationships
+ * between the metrics, especially for the latency metrics.
+ *
+ * @return the parent metric name
+ */
+ String parent() default "";
+
+ /**
+ * The metric name of an operation that happens
+ * after the operation of this metric.
+ *
+ * <p>similar as {@link #parent()}, it can be used for analyzing
+ * the dependencies between metrics.
+ *
+ * @return the metric name of an operation that happens after the operation of this metric.
+ */
+ String happensAfter() default "";
}