You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/01/30 19:46:29 UTC
[bookkeeper] branch master updated: ISSUE #934: Additional stats to
track netty latencies in PCBC
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new f3166dd ISSUE #934: Additional stats to track netty latencies in PCBC
f3166dd is described below
commit f3166dde6f0429be121110ee534a5574d136248a
Author: Kishore Udayashankar <ku...@salesforce.com>
AuthorDate: Tue Jan 30 11:46:22 2018 -0800
ISSUE #934: Additional stats to track netty latencies in PCBC
Descriptions of the changes in this PR:
(bug W-4058645) add metrics to measure netty stack latency
- Counters to track outstanding reads/writes operations
- Collect Netty IO rate and latency histograms
rev ayegorov
PerChannelBookieClient: add additional stats
Adds:
- exceptionCounter
- connectTimer
- addEntryOutstanding
- readEntryOutstanding
- nettyOpLogger
Signed-off-by: Kishore Kasi Udayashankar<kudayashankarsalesforce.com>
Master Issue: #934
Author: Kishore Udayashankar <ku...@salesforce.com>
Reviewers: Enrico Olivelli <eo...@gmail.com>, Sijie Guo <si...@apache.org>
This closes #971 from kishorekasi/pcbc-stats, closes #934
---
.../bookkeeper/client/BookKeeperClientStats.java | 5 ++
.../bookkeeper/proto/PerChannelBookieClient.java | 72 ++++++++++++++++++++--
2 files changed, 73 insertions(+), 4 deletions(-)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
index 50ee81c..60058e3 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
@@ -66,4 +66,9 @@ public interface BookKeeperClientStats {
String TIMEOUT_GET_BOOKIE_INFO = "TIMEOUT_GET_BOOKIE_INFO";
String CHANNEL_START_TLS_OP = "START_TLS";
String CHANNEL_TIMEOUT_START_TLS_OP = "TIMEOUT_START_TLS";
+ String NETTY_EXCEPTION_CNT = "NETTY_EXCEPTION_CNT";
+ String CLIENT_CONNECT_TIMER = "CLIENT_CONNECT_TIMER";
+ String ADD_OP_OUTSTANDING = "ADD_OP_OUTSTANDING";
+ String READ_OP_OUTSTANDING = "READ_OP_OUTSTANDING";
+ String NETTY_OPS = "NETTY_OPS";
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index e0199ff..a486d57 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -109,6 +109,7 @@ import org.apache.bookkeeper.proto.BookkeeperProtocol.Response;
import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
import org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest;
import org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacResponse;
+import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
@@ -173,6 +174,12 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
private final OpStatsLogger getBookieInfoTimeoutOpLogger;
private final OpStatsLogger startTLSOpLogger;
private final OpStatsLogger startTLSTimeoutOpLogger;
+ private final OpStatsLogger connectTimer;
+ private final Counter exceptionCounter;
+ private final Counter addEntryOutstanding;
+ private final Counter readEntryOutstanding;
+ /* collect stats on all Ops that flows through netty pipeline */
+ private final OpStatsLogger nettyOpLogger;
private final boolean useV2WireProtocol;
@@ -270,6 +277,11 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
getBookieInfoTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.TIMEOUT_GET_BOOKIE_INFO);
startTLSOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_START_TLS_OP);
startTLSTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_START_TLS_OP);
+ exceptionCounter = statsLogger.getCounter(BookKeeperClientStats.NETTY_EXCEPTION_CNT);
+ connectTimer = statsLogger.getOpStatsLogger(BookKeeperClientStats.CLIENT_CONNECT_TIMER);
+ addEntryOutstanding = statsLogger.getCounter(BookKeeperClientStats.ADD_OP_OUTSTANDING);
+ readEntryOutstanding = statsLogger.getCounter(BookKeeperClientStats.READ_OP_OUTSTANDING);
+ nettyOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.NETTY_OPS);
this.pcbcPool = pcbcPool;
@@ -360,6 +372,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
}
protected ChannelFuture connect() {
+ final long startTime = MathUtils.nowInNano();
if (LOG.isDebugEnabled()) {
LOG.debug("Connecting to bookie: {}", addr);
}
@@ -427,7 +440,8 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
}
ChannelFuture future = bootstrap.connect(bookieAddr);
- future.addListener(new ConnectionFutureListener());
+ future.addListener(new ConnectionFutureListener(startTime));
+
return future;
}
@@ -861,7 +875,21 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
}
try {
- channel.writeAndFlush(request, channel.voidPromise());
+ final long startTime = MathUtils.nowInNano();
+ ChannelFuture future = channel.writeAndFlush(request);
+ future.addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (future.isSuccess()) {
+ nettyOpLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime),
+ TimeUnit.NANOSECONDS);
+ completionObjects.get(key).setOutstanding();
+ } else {
+ nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime),
+ TimeUnit.NANOSECONDS);
+ }
+ }
+ });
} catch (Throwable e) {
LOG.warn("Operation {} failed", requestToString(request), e);
errorOut(key);
@@ -968,6 +996,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ exceptionCounter.inc();
if (cause instanceof CorruptedFrameException || cause instanceof TooLongFrameException) {
LOG.error("Corrupted frame received from bookie: {}", ctx.channel().remoteAddress());
ctx.close();
@@ -1152,7 +1181,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
private void readV3Response(final Response response) {
final BKPacketHeader header = response.getHeader();
- final CompletionValue completionValue = completionObjects.remove(newCompletionKey(header.getTxnId(),
+ final CompletionValue completionValue = completionObjects.get(newCompletionKey(header.getTxnId(),
header.getOperation()));
if (null == completionValue) {
@@ -1177,6 +1206,8 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
}
});
}
+
+ completionObjects.remove(newCompletionKey(header.getTxnId(), header.getOperation()));
}
void initTLSHandshake() {
@@ -1328,6 +1359,9 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
public abstract void errorOut();
public abstract void errorOut(int rc);
+ public void setOutstanding() {
+ // no-op
+ }
protected void errorOutAndRunCallback(final Runnable callback) {
executor.submitOrdered(ledgerId,
@@ -1504,9 +1538,15 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
}
@Override
+ public void setOutstanding() {
+ readEntryOutstanding.inc();
+ }
+
+ @Override
public void handleV2Response(long ledgerId, long entryId,
StatusCode status,
BookieProtocol.Response response) {
+ readEntryOutstanding.dec();
if (!(response instanceof BookieProtocol.ReadResponse)) {
return;
}
@@ -1521,6 +1561,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
@Override
public void handleV3Response(BookkeeperProtocol.Response response) {
+ readEntryOutstanding.dec();
ReadResponse readResponse = response.getReadResponse();
StatusCode status = response.getStatus() == StatusCode.EOK
? readResponse.getStatus() : response.getStatus();
@@ -1736,15 +1777,22 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
}
@Override
+ public void setOutstanding() {
+ addEntryOutstanding.inc();
+ }
+
+ @Override
public void handleV2Response(
long ledgerId, long entryId, StatusCode status,
BookieProtocol.Response response) {
+ addEntryOutstanding.dec();
handleResponse(ledgerId, entryId, status);
}
@Override
public void handleV3Response(
BookkeeperProtocol.Response response) {
+ addEntryOutstanding.dec();
AddResponse addResponse = response.getAddResponse();
StatusCode status = response.getStatus() == StatusCode.EOK
? addResponse.getStatus() : response.getStatus();
@@ -1912,13 +1960,29 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
/**
* Connection listener.
*/
- public class ConnectionFutureListener implements ChannelFutureListener {
+ class ConnectionFutureListener implements ChannelFutureListener {
+ private final long startTime;
+
+ ConnectionFutureListener(long startTime) {
+ this.startTime = startTime;
+ }
+
@Override
public void operationComplete(ChannelFuture future) throws Exception {
LOG.debug("Channel connected ({}) {}", future.isSuccess(), future.channel());
int rc;
Queue<GenericCallback<PerChannelBookieClient>> oldPendingOps;
+ /* We fill in the timer based on whether the connect operation itself succeeded regardless of
+ * whether there was a race */
+ if (future.isSuccess()) {
+ PerChannelBookieClient.this
+ .connectTimer.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
+ } else {
+ PerChannelBookieClient.this
+ .connectTimer.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
+ }
+
synchronized (PerChannelBookieClient.this) {
if (future.isSuccess() && state == ConnectionState.CONNECTING && future.channel().isActive()) {
LOG.info("Successfully connected to bookie: {}", future.channel());
--
To stop receiving notification emails like this one, please contact
sijie@apache.org.