You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/01/30 19:46:36 UTC

[GitHub] sijie closed pull request #971: Issue 934: Additional stats to track netty latencies in PCBC

sijie closed pull request #971: Issue 934: Additional stats to track netty latencies in PCBC
URL: https://github.com/apache/bookkeeper/pull/971
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
index 02947a4bb..b3fd8d614 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
@@ -64,4 +64,9 @@
     String TIMEOUT_GET_BOOKIE_INFO = "TIMEOUT_GET_BOOKIE_INFO";
     String CHANNEL_START_TLS_OP = "START_TLS";
     String CHANNEL_TIMEOUT_START_TLS_OP = "TIMEOUT_START_TLS";
+    String NETTY_EXCEPTION_CNT = "NETTY_EXCEPTION_CNT";
+    String CLIENT_CONNECT_TIMER = "CLIENT_CONNECT_TIMER";
+    String ADD_OP_OUTSTANDING = "ADD_OP_OUTSTANDING";
+    String READ_OP_OUTSTANDING = "READ_OP_OUTSTANDING";
+    String NETTY_OPS = "NETTY_OPS";
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index e0199ffb9..a486d5716 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -109,6 +109,7 @@
 import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacResponse;
+import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
@@ -173,6 +174,12 @@
     private final OpStatsLogger getBookieInfoTimeoutOpLogger;
     private final OpStatsLogger startTLSOpLogger;
     private final OpStatsLogger startTLSTimeoutOpLogger;
+    private final OpStatsLogger connectTimer;
+    private final Counter exceptionCounter;
+    private final Counter addEntryOutstanding;
+    private final Counter readEntryOutstanding;
+    /* collect stats on all Ops that flows through netty pipeline */
+    private final OpStatsLogger nettyOpLogger;
 
     private final boolean useV2WireProtocol;
 
@@ -270,6 +277,11 @@ public PerChannelBookieClient(ClientConfiguration conf, OrderedSafeExecutor exec
         getBookieInfoTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.TIMEOUT_GET_BOOKIE_INFO);
         startTLSOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_START_TLS_OP);
         startTLSTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_START_TLS_OP);
+        exceptionCounter = statsLogger.getCounter(BookKeeperClientStats.NETTY_EXCEPTION_CNT);
+        connectTimer = statsLogger.getOpStatsLogger(BookKeeperClientStats.CLIENT_CONNECT_TIMER);
+        addEntryOutstanding = statsLogger.getCounter(BookKeeperClientStats.ADD_OP_OUTSTANDING);
+        readEntryOutstanding = statsLogger.getCounter(BookKeeperClientStats.READ_OP_OUTSTANDING);
+        nettyOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.NETTY_OPS);
 
         this.pcbcPool = pcbcPool;
 
@@ -360,6 +372,7 @@ protected long getNumPendingCompletionRequests() {
     }
 
     protected ChannelFuture connect() {
+        final long startTime = MathUtils.nowInNano();
         if (LOG.isDebugEnabled()) {
             LOG.debug("Connecting to bookie: {}", addr);
         }
@@ -427,7 +440,8 @@ protected void initChannel(Channel ch) throws Exception {
         }
 
         ChannelFuture future = bootstrap.connect(bookieAddr);
-        future.addListener(new ConnectionFutureListener());
+        future.addListener(new ConnectionFutureListener(startTime));
+
         return future;
     }
 
@@ -861,7 +875,21 @@ private void writeAndFlush(final Channel channel,
         }
 
         try {
-            channel.writeAndFlush(request, channel.voidPromise());
+            final long startTime = MathUtils.nowInNano();
+            ChannelFuture future = channel.writeAndFlush(request);
+            future.addListener(new ChannelFutureListener() {
+                @Override
+                public void operationComplete(ChannelFuture future) throws Exception {
+                    if (future.isSuccess()) {
+                        nettyOpLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime),
+                                TimeUnit.NANOSECONDS);
+                        completionObjects.get(key).setOutstanding();
+                    } else {
+                        nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime),
+                                TimeUnit.NANOSECONDS);
+                    }
+                }
+            });
         } catch (Throwable e) {
             LOG.warn("Operation {} failed", requestToString(request), e);
             errorOut(key);
@@ -968,6 +996,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
      */
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+        exceptionCounter.inc();
         if (cause instanceof CorruptedFrameException || cause instanceof TooLongFrameException) {
             LOG.error("Corrupted frame received from bookie: {}", ctx.channel().remoteAddress());
             ctx.close();
@@ -1152,7 +1181,7 @@ private static StatusCode getStatusCodeFromErrorCode(int errorCode) {
     private void readV3Response(final Response response) {
         final BKPacketHeader header = response.getHeader();
 
-        final CompletionValue completionValue = completionObjects.remove(newCompletionKey(header.getTxnId(),
+        final CompletionValue completionValue = completionObjects.get(newCompletionKey(header.getTxnId(),
                 header.getOperation()));
 
         if (null == completionValue) {
@@ -1177,6 +1206,8 @@ public String toString() {
                 }
             });
         }
+
+        completionObjects.remove(newCompletionKey(header.getTxnId(), header.getOperation()));
     }
 
     void initTLSHandshake() {
@@ -1328,6 +1359,9 @@ protected int logAndConvertStatus(StatusCode status, int defaultStatus,
 
         public abstract void errorOut();
         public abstract void errorOut(int rc);
+        public void setOutstanding() {
+            // no-op
+        }
 
         protected void errorOutAndRunCallback(final Runnable callback) {
             executor.submitOrdered(ledgerId,
@@ -1503,10 +1537,16 @@ public void errorOut(final int rc) {
                                                entryId, null, ctx));
         }
 
+        @Override
+        public void setOutstanding() {
+            readEntryOutstanding.inc();
+        }
+
         @Override
         public void handleV2Response(long ledgerId, long entryId,
                                      StatusCode status,
                                      BookieProtocol.Response response) {
+            readEntryOutstanding.dec();
             if (!(response instanceof BookieProtocol.ReadResponse)) {
                 return;
             }
@@ -1521,6 +1561,7 @@ public void handleV2Response(long ledgerId, long entryId,
 
         @Override
         public void handleV3Response(BookkeeperProtocol.Response response) {
+            readEntryOutstanding.dec();
             ReadResponse readResponse = response.getReadResponse();
             StatusCode status = response.getStatus() == StatusCode.EOK
                 ? readResponse.getStatus() : response.getStatus();
@@ -1735,16 +1776,23 @@ public void errorOut(final int rc) {
                     () -> writeComplete(rc, ledgerId, entryId, addr, ctx));
         }
 
+        @Override
+        public void setOutstanding() {
+            addEntryOutstanding.inc();
+        }
+
         @Override
         public void handleV2Response(
                 long ledgerId, long entryId, StatusCode status,
                 BookieProtocol.Response response) {
+            addEntryOutstanding.dec();
             handleResponse(ledgerId, entryId, status);
         }
 
         @Override
         public void handleV3Response(
                 BookkeeperProtocol.Response response) {
+            addEntryOutstanding.dec();
             AddResponse addResponse = response.getAddResponse();
             StatusCode status = response.getStatus() == StatusCode.EOK
                 ? addResponse.getStatus() : response.getStatus();
@@ -1912,13 +1960,29 @@ public void release() {
     /**
      * Connection listener.
      */
-    public class ConnectionFutureListener implements ChannelFutureListener {
+    class ConnectionFutureListener implements ChannelFutureListener {
+        private final long startTime;
+
+        ConnectionFutureListener(long startTime) {
+            this.startTime = startTime;
+        }
+
         @Override
         public void operationComplete(ChannelFuture future) throws Exception {
             LOG.debug("Channel connected ({}) {}", future.isSuccess(), future.channel());
             int rc;
             Queue<GenericCallback<PerChannelBookieClient>> oldPendingOps;
 
+            /* We fill in the timer based on whether the connect operation itself succeeded regardless of
+             * whether there was a race */
+            if (future.isSuccess()) {
+                PerChannelBookieClient.this
+                .connectTimer.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
+            } else {
+                PerChannelBookieClient.this
+                .connectTimer.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
+            }
+
             synchronized (PerChannelBookieClient.this) {
                 if (future.isSuccess() && state == ConnectionState.CONNECTING && future.channel().isActive()) {
                     LOG.info("Successfully connected to bookie: {}", future.channel());


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services