You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/01/30 19:46:29 UTC

[bookkeeper] branch master updated: ISSUE #934: Additional stats to track netty latencies in PCBC

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new f3166dd  ISSUE #934: Additional stats to track netty latencies in PCBC
f3166dd is described below

commit f3166dde6f0429be121110ee534a5574d136248a
Author: Kishore Udayashankar <ku...@salesforce.com>
AuthorDate: Tue Jan 30 11:46:22 2018 -0800

    ISSUE #934: Additional stats to track netty latencies in PCBC
    
    Descriptions of the changes in this PR:
    
    (bug W-4058645) add metrics to measure netty stack latency
    - Counters to track outstanding reads/writes operations
    - Collect Netty IO rate and latency histograms
    
    rev ayegorov
    
    PerChannelBookieClient: add additional stats
    
    Adds:
    - exceptionCounter
    - connectTimer
    - addEntryOutstanding
    - readEntryOutstanding
    - nettyOpLogger
    
    Signed-off-by: Kishore Kasi Udayashankar<kudayashankarsalesforce.com>
    
    Master Issue: #934
    
    Author: Kishore Udayashankar <ku...@salesforce.com>
    
    Reviewers: Enrico Olivelli <eo...@gmail.com>, Sijie Guo <si...@apache.org>
    
    This closes #971 from kishorekasi/pcbc-stats, closes #934
---
 .../bookkeeper/client/BookKeeperClientStats.java   |  5 ++
 .../bookkeeper/proto/PerChannelBookieClient.java   | 72 ++++++++++++++++++++--
 2 files changed, 73 insertions(+), 4 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
index 50ee81c..60058e3 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
@@ -66,4 +66,9 @@ public interface BookKeeperClientStats {
     String TIMEOUT_GET_BOOKIE_INFO = "TIMEOUT_GET_BOOKIE_INFO";
     String CHANNEL_START_TLS_OP = "START_TLS";
     String CHANNEL_TIMEOUT_START_TLS_OP = "TIMEOUT_START_TLS";
+    String NETTY_EXCEPTION_CNT = "NETTY_EXCEPTION_CNT";
+    String CLIENT_CONNECT_TIMER = "CLIENT_CONNECT_TIMER";
+    String ADD_OP_OUTSTANDING = "ADD_OP_OUTSTANDING";
+    String READ_OP_OUTSTANDING = "READ_OP_OUTSTANDING";
+    String NETTY_OPS = "NETTY_OPS";
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index e0199ff..a486d57 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -109,6 +109,7 @@ import org.apache.bookkeeper.proto.BookkeeperProtocol.Response;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacResponse;
+import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
@@ -173,6 +174,12 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
     private final OpStatsLogger getBookieInfoTimeoutOpLogger;
     private final OpStatsLogger startTLSOpLogger;
     private final OpStatsLogger startTLSTimeoutOpLogger;
+    private final OpStatsLogger connectTimer;
+    private final Counter exceptionCounter;
+    private final Counter addEntryOutstanding;
+    private final Counter readEntryOutstanding;
+    /* collect stats on all Ops that flows through netty pipeline */
+    private final OpStatsLogger nettyOpLogger;
 
     private final boolean useV2WireProtocol;
 
@@ -270,6 +277,11 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
         getBookieInfoTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.TIMEOUT_GET_BOOKIE_INFO);
         startTLSOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_START_TLS_OP);
         startTLSTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_START_TLS_OP);
+        exceptionCounter = statsLogger.getCounter(BookKeeperClientStats.NETTY_EXCEPTION_CNT);
+        connectTimer = statsLogger.getOpStatsLogger(BookKeeperClientStats.CLIENT_CONNECT_TIMER);
+        addEntryOutstanding = statsLogger.getCounter(BookKeeperClientStats.ADD_OP_OUTSTANDING);
+        readEntryOutstanding = statsLogger.getCounter(BookKeeperClientStats.READ_OP_OUTSTANDING);
+        nettyOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.NETTY_OPS);
 
         this.pcbcPool = pcbcPool;
 
@@ -360,6 +372,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
     }
 
     protected ChannelFuture connect() {
+        final long startTime = MathUtils.nowInNano();
         if (LOG.isDebugEnabled()) {
             LOG.debug("Connecting to bookie: {}", addr);
         }
@@ -427,7 +440,8 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
         }
 
         ChannelFuture future = bootstrap.connect(bookieAddr);
-        future.addListener(new ConnectionFutureListener());
+        future.addListener(new ConnectionFutureListener(startTime));
+
         return future;
     }
 
@@ -861,7 +875,21 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
         }
 
         try {
-            channel.writeAndFlush(request, channel.voidPromise());
+            final long startTime = MathUtils.nowInNano();
+            ChannelFuture future = channel.writeAndFlush(request);
+            future.addListener(new ChannelFutureListener() {
+                @Override
+                public void operationComplete(ChannelFuture future) throws Exception {
+                    if (future.isSuccess()) {
+                        nettyOpLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime),
+                                TimeUnit.NANOSECONDS);
+                        completionObjects.get(key).setOutstanding();
+                    } else {
+                        nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime),
+                                TimeUnit.NANOSECONDS);
+                    }
+                }
+            });
         } catch (Throwable e) {
             LOG.warn("Operation {} failed", requestToString(request), e);
             errorOut(key);
@@ -968,6 +996,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
      */
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+        exceptionCounter.inc();
         if (cause instanceof CorruptedFrameException || cause instanceof TooLongFrameException) {
             LOG.error("Corrupted frame received from bookie: {}", ctx.channel().remoteAddress());
             ctx.close();
@@ -1152,7 +1181,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
     private void readV3Response(final Response response) {
         final BKPacketHeader header = response.getHeader();
 
-        final CompletionValue completionValue = completionObjects.remove(newCompletionKey(header.getTxnId(),
+        final CompletionValue completionValue = completionObjects.get(newCompletionKey(header.getTxnId(),
                 header.getOperation()));
 
         if (null == completionValue) {
@@ -1177,6 +1206,8 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
                 }
             });
         }
+
+        completionObjects.remove(newCompletionKey(header.getTxnId(), header.getOperation()));
     }
 
     void initTLSHandshake() {
@@ -1328,6 +1359,9 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
 
         public abstract void errorOut();
         public abstract void errorOut(int rc);
+        public void setOutstanding() {
+            // no-op
+        }
 
         protected void errorOutAndRunCallback(final Runnable callback) {
             executor.submitOrdered(ledgerId,
@@ -1504,9 +1538,15 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
         }
 
         @Override
+        public void setOutstanding() {
+            readEntryOutstanding.inc();
+        }
+
+        @Override
         public void handleV2Response(long ledgerId, long entryId,
                                      StatusCode status,
                                      BookieProtocol.Response response) {
+            readEntryOutstanding.dec();
             if (!(response instanceof BookieProtocol.ReadResponse)) {
                 return;
             }
@@ -1521,6 +1561,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
 
         @Override
         public void handleV3Response(BookkeeperProtocol.Response response) {
+            readEntryOutstanding.dec();
             ReadResponse readResponse = response.getReadResponse();
             StatusCode status = response.getStatus() == StatusCode.EOK
                 ? readResponse.getStatus() : response.getStatus();
@@ -1736,15 +1777,22 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
         }
 
         @Override
+        public void setOutstanding() {
+            addEntryOutstanding.inc();
+        }
+
+        @Override
         public void handleV2Response(
                 long ledgerId, long entryId, StatusCode status,
                 BookieProtocol.Response response) {
+            addEntryOutstanding.dec();
             handleResponse(ledgerId, entryId, status);
         }
 
         @Override
         public void handleV3Response(
                 BookkeeperProtocol.Response response) {
+            addEntryOutstanding.dec();
             AddResponse addResponse = response.getAddResponse();
             StatusCode status = response.getStatus() == StatusCode.EOK
                 ? addResponse.getStatus() : response.getStatus();
@@ -1912,13 +1960,29 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
     /**
      * Connection listener.
      */
-    public class ConnectionFutureListener implements ChannelFutureListener {
+    class ConnectionFutureListener implements ChannelFutureListener {
+        private final long startTime;
+
+        ConnectionFutureListener(long startTime) {
+            this.startTime = startTime;
+        }
+
         @Override
         public void operationComplete(ChannelFuture future) throws Exception {
             LOG.debug("Channel connected ({}) {}", future.isSuccess(), future.channel());
             int rc;
             Queue<GenericCallback<PerChannelBookieClient>> oldPendingOps;
 
+            /* We fill in the timer based on whether the connect operation itself succeeded regardless of
+             * whether there was a race */
+            if (future.isSuccess()) {
+                PerChannelBookieClient.this
+                .connectTimer.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
+            } else {
+                PerChannelBookieClient.this
+                .connectTimer.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
+            }
+
             synchronized (PerChannelBookieClient.this) {
                 if (future.isSuccess() && state == ConnectionState.CONNECTING && future.channel().isActive()) {
                     LOG.info("Successfully connected to bookie: {}", future.channel());

-- 
To stop receiving notification emails like this one, please contact
sijie@apache.org.