You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/10/03 15:30:47 UTC

[bookkeeper] branch master updated: [TABLE SERVICE] [STATS] enable stats for grpc calls on both client and server

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 97c6656  [TABLE SERVICE] [STATS] enable stats for grpc calls on both client and server
97c6656 is described below

commit 97c6656a9600d20763202ee6a7946383da1bd712
Author: Sijie Guo <gu...@gmail.com>
AuthorDate: Wed Oct 3 08:30:42 2018 -0700

    [TABLE SERVICE] [STATS] enable stats for grpc calls on both client and server
    
    Descriptions of the changes in this PR:
    
    *Motivation*
    
    We need visibility on both client and server on table service.
    
    *Changes*
    
    - introduce grpc monitoring interceptors for both client and server
    - enable monitoring interceptors on both client and server
    
    
    
    
    Author: Sijie Guo <si...@apache.org>
    
    Reviewers: Enrico Olivelli <eo...@gmail.com>, Jia Zhai <None>
    
    This closes #1731 from sijie/grpc_stats
---
 bookkeeper-benchmark/pom.xml                       |  11 +-
 .../apache/bookkeeper/test/TestStatsProvider.java  |   6 +-
 metadata-drivers/etcd/pom.xml                      |   7 +
 .../clients/config/StorageClientSettings.java      |   8 +
 .../clients/impl/channel/StorageServerChannel.java |  24 +-
 stream/common/pom.xml                              |   7 +
 .../bookkeeper/common/grpc/stats/ClientStats.java  | 126 ++++++++
 .../common/grpc/stats/MonitoringClientCall.java    |  52 ++++
 .../grpc/stats/MonitoringClientCallListener.java   |  69 +++++
 .../grpc/stats/MonitoringClientInterceptor.java    |  84 ++++++
 .../common/grpc/stats/MonitoringServerCall.java    |  59 ++++
 .../grpc/stats/MonitoringServerCallListener.java   |  54 ++++
 .../grpc/stats/MonitoringServerInterceptor.java    |  88 ++++++
 .../bookkeeper/common/grpc/stats/ServerStats.java  | 126 ++++++++
 .../bookkeeper/common/grpc/stats/package-info.java |  23 ++
 .../common/grpc/stats/ClientStatsTest.java         | 163 +++++++++++
 .../grpc/stats/GrpcStatsIntegrationTest.java       | 324 +++++++++++++++++++++
 .../common/grpc/stats/ServerStatsTest.java         | 163 +++++++++++
 .../bookkeeper/stream/server/grpc/GrpcServer.java  |  16 +-
 19 files changed, 1397 insertions(+), 13 deletions(-)

diff --git a/bookkeeper-benchmark/pom.xml b/bookkeeper-benchmark/pom.xml
index 163c1bd..9b86140 100644
--- a/bookkeeper-benchmark/pom.xml
+++ b/bookkeeper-benchmark/pom.xml
@@ -52,13 +52,20 @@
     <dependency>
       <groupId>org.apache.bookkeeper</groupId>
       <artifactId>bookkeeper-server</artifactId>
-      <version>${project.parent.version}</version>
+      <version>${project.version}</version>
       <scope>compile</scope>
     </dependency>
     <dependency>
       <groupId>org.apache.bookkeeper</groupId>
+      <artifactId>bookkeeper-common</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.bookkeeper</groupId>
       <artifactId>bookkeeper-server</artifactId>
-      <version>${project.parent.version}</version>
+      <version>${project.version}</version>
       <type>test-jar</type>
       <scope>test</scope>
     </dependency>
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestStatsProvider.java b/bookkeeper-common/src/test/java/org/apache/bookkeeper/test/TestStatsProvider.java
similarity index 97%
rename from bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestStatsProvider.java
rename to bookkeeper-common/src/test/java/org/apache/bookkeeper/test/TestStatsProvider.java
index 9b9fdb5..aca154e 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestStatsProvider.java
+++ b/bookkeeper-common/src/test/java/org/apache/bookkeeper/test/TestStatsProvider.java
@@ -1,5 +1,4 @@
 /*
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -16,7 +15,6 @@
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
- *
  */
 
 package org.apache.bookkeeper.test;
@@ -103,12 +101,12 @@ public class TestStatsProvider implements StatsProvider {
 
         @Override
         public void registerFailedEvent(long eventLatency, TimeUnit unit) {
-            registerFailedValue(unit.convert(eventLatency, TimeUnit.NANOSECONDS));
+            registerFailedValue(TimeUnit.NANOSECONDS.convert(eventLatency, unit));
         }
 
         @Override
         public void registerSuccessfulEvent(long eventLatency, TimeUnit unit) {
-            registerSuccessfulValue(unit.convert(eventLatency, TimeUnit.NANOSECONDS));
+            registerSuccessfulValue(TimeUnit.NANOSECONDS.convert(eventLatency, unit));
         }
 
         @Override
diff --git a/metadata-drivers/etcd/pom.xml b/metadata-drivers/etcd/pom.xml
index 9e463bd..0d9e0dd 100644
--- a/metadata-drivers/etcd/pom.xml
+++ b/metadata-drivers/etcd/pom.xml
@@ -56,6 +56,13 @@
         </dependency>
         <dependency>
             <groupId>org.apache.bookkeeper</groupId>
+            <artifactId>bookkeeper-common</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.bookkeeper</groupId>
             <artifactId>bookkeeper-server</artifactId>
             <version>${project.version}</version>
             <type>test-jar</type>
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/config/StorageClientSettings.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/config/StorageClientSettings.java
index e78ca73..b5541be 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/config/StorageClientSettings.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/config/StorageClientSettings.java
@@ -23,6 +23,7 @@ import org.apache.bookkeeper.clients.resolver.EndpointResolver;
 import org.apache.bookkeeper.clients.utils.ClientConstants;
 import org.apache.bookkeeper.common.net.ServiceURI;
 import org.apache.bookkeeper.common.util.Backoff;
+import org.apache.bookkeeper.stats.StatsLogger;
 import org.inferred.freebuilder.FreeBuilder;
 
 /**
@@ -73,6 +74,13 @@ public interface StorageClientSettings {
     Optional<String> clientName();
 
     /**
+     * Configure a stats logger to collect stats exposed by this client.
+     *
+     * @return stats logger.
+     */
+    Optional<StatsLogger> statsLogger();
+
+    /**
      * Configure a backoff policy for the client.
      *
      * <p>There are a few default backoff policies defined in {@link org.apache.bookkeeper.common.util.Backoff}.
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java
index e5206de..afacce1 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java
@@ -31,6 +31,7 @@ import org.apache.bookkeeper.clients.config.StorageClientSettings;
 import org.apache.bookkeeper.clients.impl.container.StorageContainerClientInterceptor;
 import org.apache.bookkeeper.clients.resolver.EndpointResolver;
 import org.apache.bookkeeper.clients.utils.GrpcUtils;
+import org.apache.bookkeeper.common.grpc.stats.MonitoringClientInterceptor;
 import org.apache.bookkeeper.stream.proto.common.Endpoint;
 import org.apache.bookkeeper.stream.proto.kv.rpc.TableServiceGrpc;
 import org.apache.bookkeeper.stream.proto.kv.rpc.TableServiceGrpc.TableServiceFutureStub;
@@ -49,11 +50,24 @@ import org.apache.bookkeeper.stream.proto.storage.StorageContainerServiceGrpc.St
 public class StorageServerChannel implements AutoCloseable {
 
     public static Function<Endpoint, StorageServerChannel> factory(StorageClientSettings settings) {
-        return (endpoint) -> new StorageServerChannel(
-            endpoint,
-            Optional.empty(),
-            settings.usePlaintext(),
-            settings.endpointResolver());
+        return new Function<Endpoint, StorageServerChannel>() {
+
+            private final Optional<MonitoringClientInterceptor> interceptor =
+                settings.statsLogger().map(statsLogger ->
+                    MonitoringClientInterceptor.create(statsLogger, true));
+
+            @Override
+            public StorageServerChannel apply(Endpoint endpoint) {
+                StorageServerChannel channel = new StorageServerChannel(
+                    endpoint,
+                    Optional.empty(),
+                    settings.usePlaintext(),
+                    settings.endpointResolver());
+                return interceptor
+                    .map(interceptor -> channel.intercept(interceptor))
+                    .orElse(channel);
+            }
+        };
     }
 
     private final Optional<String> token;
diff --git a/stream/common/pom.xml b/stream/common/pom.xml
index 40c8b07..c5d0339 100644
--- a/stream/common/pom.xml
+++ b/stream/common/pom.xml
@@ -48,6 +48,13 @@
       <optional>true</optional>
     </dependency>
     <dependency>
+      <groupId>org.apache.bookkeeper</groupId>
+      <artifactId>bookkeeper-common</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.apache.bookkeeper.tests</groupId>
       <artifactId>stream-storage-tests-common</artifactId>
       <version>${project.version}</version>
diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/ClientStats.java b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/ClientStats.java
new file mode 100644
index 0000000..2b7ba21
--- /dev/null
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/ClientStats.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.stats;
+
+import io.grpc.MethodDescriptor;
+import io.grpc.MethodDescriptor.MethodType;
+import io.grpc.Status.Code;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+
+/**
+ * Client side monitoring for grpc services.
+ */
+class ClientStats {
+
+    private final Counter rpcStarted;
+    private final Counter rpcCompleted;
+    private final Counter streamMessagesReceived;
+    private final Counter streamMessagesSent;
+    private final Optional<OpStatsLogger> completedLatencyMicros;
+
+    private ClientStats(StatsLogger rootStatsLogger,
+                        boolean includeLatencyHistograms,
+                        boolean streamRequests,
+                        boolean streamResponses) {
+        this.rpcStarted = rootStatsLogger.getCounter("grpc_started");
+        this.rpcCompleted = rootStatsLogger.getCounter("grpc_completed");
+        if (streamResponses) {
+            this.streamMessagesReceived = rootStatsLogger.getCounter("grpc_msg_received");
+        } else {
+            this.streamMessagesReceived = NullStatsLogger.INSTANCE.getCounter("grpc_msg_received");
+        }
+        if (streamRequests) {
+            this.streamMessagesSent = rootStatsLogger.getCounter("grpc_msg_sent");
+        } else {
+            this.streamMessagesSent = NullStatsLogger.INSTANCE.getCounter("grpc_msg_sent");
+        }
+        if (includeLatencyHistograms) {
+            this.completedLatencyMicros = Optional.of(
+                rootStatsLogger.getOpStatsLogger("grpc_latency_micros")
+            );
+        } else {
+            this.completedLatencyMicros = Optional.empty();
+        }
+    }
+
+    public void recordCallStarted() {
+        rpcStarted.inc();
+    }
+
+    public void recordClientHandled(Code code) {
+        rpcCompleted.inc();
+    }
+
+    public void recordStreamMessageSent() {
+        streamMessagesSent.inc();
+    }
+
+    public void recordStreamMessageReceived() {
+        streamMessagesReceived.inc();
+    }
+
+    public boolean shouldRecordLatency() {
+        return completedLatencyMicros.isPresent();
+    }
+
+    public void recordLatency(boolean success, long latencyMicros) {
+        completedLatencyMicros.ifPresent(latencyLogger -> {
+            if (success) {
+                latencyLogger.registerSuccessfulEvent(latencyMicros, TimeUnit.MICROSECONDS);
+            } else {
+                latencyLogger.registerFailedEvent(latencyMicros, TimeUnit.MICROSECONDS);
+            }
+        });
+    }
+
+    /**
+     * Knows how to produce {@link ClientStats} instances for individual methods.
+     */
+    static class Factory {
+
+        private final boolean includeLatencyHistograms;
+
+        Factory(boolean includeLatencyHistograms) {
+            this.includeLatencyHistograms = includeLatencyHistograms;
+        }
+
+        /** Creates a {@link ClientStats} for the supplied method. */
+        <ReqT, RespT> ClientStats createMetricsForMethod(MethodDescriptor<ReqT, RespT> methodDescriptor,
+                                                         StatsLogger statsLogger) {
+
+            String fullMethodName = methodDescriptor.getFullMethodName();
+            String serviceName = MethodDescriptor.extractFullServiceName(fullMethodName);
+            String methodName = fullMethodName.substring(serviceName.length() + 1);
+
+            MethodType type = methodDescriptor.getType();
+            return new ClientStats(
+                statsLogger.scope(methodName),
+                includeLatencyHistograms,
+                type == MethodType.CLIENT_STREAMING || type == MethodType.BIDI_STREAMING,
+                type == MethodType.SERVER_STREAMING || type == MethodType.BIDI_STREAMING);
+        }
+    }
+
+}
diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/MonitoringClientCall.java b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/MonitoringClientCall.java
new file mode 100644
index 0000000..763ca98
--- /dev/null
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/MonitoringClientCall.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.stats;
+
+import io.grpc.ClientCall;
+import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
+import io.grpc.Metadata;
+
+/**
+ * A {@link SimpleForwardingClientCall} which increments counters for rpc calls.
+ */
+class MonitoringClientCall<ReqT, RespT> extends SimpleForwardingClientCall<ReqT, RespT> {
+
+    private final ClientStats stats;
+
+    MonitoringClientCall(ClientCall<ReqT, RespT> delegate,
+                         ClientStats stats) {
+        super(delegate);
+        this.stats = stats;
+    }
+
+    @Override
+    public void start(Listener<RespT> responseListener, Metadata headers) {
+        stats.recordCallStarted();
+        super.start(new MonitoringClientCallListener<>(
+            responseListener, stats
+        ), headers);
+    }
+
+    @Override
+    public void sendMessage(ReqT message) {
+        stats.recordStreamMessageSent();
+        super.sendMessage(message);
+    }
+}
diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/MonitoringClientCallListener.java b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/MonitoringClientCallListener.java
new file mode 100644
index 0000000..651439e
--- /dev/null
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/MonitoringClientCallListener.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.stats;
+
+import io.grpc.ClientCall.Listener;
+import io.grpc.ForwardingClientCallListener;
+import io.grpc.Metadata;
+import io.grpc.Status;
+import org.apache.bookkeeper.common.util.MathUtils;
+
+/**
+ * A {@link ForwardingClientCallListener} that monitors stats on grpc clients.
+ */
+class MonitoringClientCallListener<RespT> extends ForwardingClientCallListener<RespT> {
+
+    private final Listener<RespT> delegate;
+    private final ClientStats stats;
+    private final long startNanos;
+
+    MonitoringClientCallListener(Listener<RespT> delegate,
+                                 ClientStats stats) {
+        this.delegate = delegate;
+        this.stats = stats;
+        this.startNanos = MathUtils.nowInNano();
+    }
+
+    @Override
+    protected Listener<RespT> delegate() {
+        return delegate;
+    }
+
+    @Override
+    public void onMessage(RespT message) {
+        stats.recordStreamMessageReceived();
+        super.onMessage(message);
+    }
+
+    @Override
+    public void onClose(Status status, Metadata trailers) {
+        stats.recordClientHandled(status.getCode());
+        if (stats.shouldRecordLatency()) {
+            long latencyMicros = MathUtils.elapsedMicroSec(startNanos);
+            stats.recordLatency(Status.OK == status, latencyMicros);
+        }
+        super.onClose(status, trailers);
+    }
+
+    @Override
+    public String toString() {
+        return delegate.toString();
+    }
+}
diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/MonitoringClientInterceptor.java b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/MonitoringClientInterceptor.java
new file mode 100644
index 0000000..868f249
--- /dev/null
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/MonitoringClientInterceptor.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.stats;
+
+import io.grpc.CallOptions;
+import io.grpc.Channel;
+import io.grpc.ClientCall;
+import io.grpc.ClientInterceptor;
+import io.grpc.MethodDescriptor;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.bookkeeper.common.grpc.stats.ClientStats.Factory;
+import org.apache.bookkeeper.stats.StatsLogger;
+
+/**
+ * A {@link ClientInterceptor} that sends stats about grpc calls to stats logger.
+ */
+public class MonitoringClientInterceptor implements ClientInterceptor {
+
+    /**
+     * Create a monitoring client interceptor with provided stats logger and configuration.
+     *
+     * @param statsLogger stats logger to collect grpc stats
+     * @param includeLatencyHistograms flag indicates whether to include latency histograms or not
+     * @return a monitoring client interceptor
+     */
+    public static MonitoringClientInterceptor create(StatsLogger statsLogger,
+                                                     boolean includeLatencyHistograms) {
+        return new MonitoringClientInterceptor(
+            new Factory(includeLatencyHistograms), statsLogger);
+    }
+
+    private final Factory statsFactory;
+    private final StatsLogger statsLogger;
+    private final ConcurrentMap<String, ClientStats> methods;
+
+    private MonitoringClientInterceptor(Factory statsFactory,
+                                        StatsLogger statsLogger) {
+        this.statsFactory = statsFactory;
+        this.statsLogger = statsLogger;
+        this.methods = new ConcurrentHashMap<>();
+    }
+
+    private ClientStats getMethodStats(MethodDescriptor<?, ?> method) {
+        ClientStats stats = methods.get(method.getFullMethodName());
+        if (null != stats) {
+            return stats;
+        }
+        ClientStats newStats = statsFactory.createMetricsForMethod(method, statsLogger);
+        ClientStats oldStats = methods.putIfAbsent(method.getFullMethodName(), newStats);
+        if (null != oldStats) {
+            return oldStats;
+        } else {
+            return newStats;
+        }
+    }
+
+    @Override
+    public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
+        MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
+        ClientStats stats = getMethodStats(method);
+        return new MonitoringClientCall<>(
+            next.newCall(method, callOptions),
+            stats
+        );
+    }
+}
diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/MonitoringServerCall.java b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/MonitoringServerCall.java
new file mode 100644
index 0000000..abb0969
--- /dev/null
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/MonitoringServerCall.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.stats;
+
+import io.grpc.ForwardingServerCall.SimpleForwardingServerCall;
+import io.grpc.Metadata;
+import io.grpc.ServerCall;
+import io.grpc.Status;
+import org.apache.bookkeeper.common.util.MathUtils;
+
+/**
+ * A {@link SimpleForwardingServerCall} which increments counters for rpc calls.
+ */
+class MonitoringServerCall<ReqT, RespT> extends SimpleForwardingServerCall<ReqT, RespT> {
+
+    private final ServerStats stats;
+    private final long startNanos;
+
+    MonitoringServerCall(ServerCall<ReqT, RespT> delegate,
+                         ServerStats stats) {
+        super(delegate);
+        this.stats = stats;
+        this.startNanos = MathUtils.nowInNano();
+        stats.recordCallStarted();
+    }
+
+    @Override
+    public void sendMessage(RespT message) {
+        stats.recordStreamMessageSent();
+        super.sendMessage(message);
+    }
+
+    @Override
+    public void close(Status status, Metadata trailers) {
+        stats.recordServerHandled(status.getCode());
+        if (stats.shouldRecordLatency()) {
+            long latencyMicros = MathUtils.elapsedMicroSec(startNanos);
+            stats.recordLatency(Status.OK == status, latencyMicros);
+        }
+        super.close(status, trailers);
+    }
+}
diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/MonitoringServerCallListener.java b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/MonitoringServerCallListener.java
new file mode 100644
index 0000000..6eef140
--- /dev/null
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/MonitoringServerCallListener.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.stats;
+
+import io.grpc.ForwardingServerCallListener;
+import io.grpc.ServerCall.Listener;
+
+/**
+ * A {@link ForwardingServerCallListener} that monitors stats on grpc clients.
+ */
+class MonitoringServerCallListener<RespT> extends ForwardingServerCallListener<RespT> {
+
+    private final Listener<RespT> delegate;
+    private final ServerStats stats;
+
+    MonitoringServerCallListener(Listener<RespT> delegate,
+                                 ServerStats stats) {
+        this.delegate = delegate;
+        this.stats = stats;
+    }
+
+    @Override
+    protected Listener<RespT> delegate() {
+        return delegate;
+    }
+
+    @Override
+    public void onMessage(RespT message) {
+        stats.recordStreamMessageReceived();
+        super.onMessage(message);
+    }
+
+    @Override
+    public String toString() {
+        return delegate.toString();
+    }
+}
diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/MonitoringServerInterceptor.java b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/MonitoringServerInterceptor.java
new file mode 100644
index 0000000..1c3ed25
--- /dev/null
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/MonitoringServerInterceptor.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.stats;
+
+import io.grpc.Metadata;
+import io.grpc.MethodDescriptor;
+import io.grpc.ServerCall;
+import io.grpc.ServerCall.Listener;
+import io.grpc.ServerCallHandler;
+import io.grpc.ServerInterceptor;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.bookkeeper.common.grpc.stats.ServerStats.Factory;
+import org.apache.bookkeeper.stats.StatsLogger;
+
+/**
+ * A {@link ServerInterceptor} that sends stats about grpc calls to stats logger.
+ */
+public class MonitoringServerInterceptor implements ServerInterceptor {
+
+    /**
+     * Create a monitoring client interceptor with provided stats logger and configuration.
+     *
+     * @param statsLogger stats logger to collect grpc stats
+     * @param includeLatencyHistograms flag indicates whether to include latency histograms or not
+     * @return a monitoring client interceptor
+     */
+    public static MonitoringServerInterceptor create(StatsLogger statsLogger,
+                                                     boolean includeLatencyHistograms) {
+        return new MonitoringServerInterceptor(
+            new Factory(includeLatencyHistograms), statsLogger);
+    }
+
+    private final Factory statsFactory;
+    private final StatsLogger statsLogger;
+    private final ConcurrentMap<String, ServerStats> methods;
+
+    private MonitoringServerInterceptor(Factory statsFactory,
+                                        StatsLogger statsLogger) {
+        this.statsFactory = statsFactory;
+        this.statsLogger = statsLogger;
+        this.methods = new ConcurrentHashMap<>();
+    }
+
+    private ServerStats getMethodStats(MethodDescriptor<?, ?> method) {
+        ServerStats stats = methods.get(method.getFullMethodName());
+        if (null != stats) {
+            return stats;
+        }
+        ServerStats newStats = statsFactory.createMetricsForMethod(method, statsLogger);
+        ServerStats oldStats = methods.putIfAbsent(method.getFullMethodName(), newStats);
+        if (null != oldStats) {
+            return oldStats;
+        } else {
+            return newStats;
+        }
+    }
+
+
+    @Override
+    public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
+                                                      Metadata headers,
+                                                      ServerCallHandler<ReqT, RespT> next) {
+        MethodDescriptor<ReqT, RespT> method = call.getMethodDescriptor();
+        ServerStats stats = getMethodStats(method);
+        ServerCall<ReqT, RespT> monitoringCall = new MonitoringServerCall<>(call, stats);
+        return new MonitoringServerCallListener<>(
+            next.startCall(monitoringCall, headers), stats
+        );
+    }
+}
diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/ServerStats.java b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/ServerStats.java
new file mode 100644
index 0000000..eae8348
--- /dev/null
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/ServerStats.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.stats;
+
+import io.grpc.MethodDescriptor;
+import io.grpc.MethodDescriptor.MethodType;
+import io.grpc.Status.Code;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+
+/**
+ * Client side monitoring for grpc services.
+ */
+class ServerStats {
+
+    private final Counter rpcStarted;
+    private final Counter rpcCompleted;
+    private final Counter streamMessagesReceived;
+    private final Counter streamMessagesSent;
+    private final Optional<OpStatsLogger> completedLatencyMicros;
+
+    private ServerStats(StatsLogger rootStatsLogger,
+                        boolean includeLatencyHistograms,
+                        boolean streamRequests,
+                        boolean streamResponses) {
+        this.rpcStarted = rootStatsLogger.getCounter("grpc_started");
+        this.rpcCompleted = rootStatsLogger.getCounter("grpc_completed");
+        if (streamRequests) {
+            this.streamMessagesReceived = rootStatsLogger.getCounter("grpc_msg_received");
+        } else {
+            this.streamMessagesReceived = NullStatsLogger.INSTANCE.getCounter("grpc_msg_received");
+        }
+        if (streamResponses) {
+            this.streamMessagesSent = rootStatsLogger.getCounter("grpc_msg_sent");
+        } else {
+            this.streamMessagesSent = NullStatsLogger.INSTANCE.getCounter("grpc_msg_sent");
+        }
+        if (includeLatencyHistograms) {
+            this.completedLatencyMicros = Optional.of(
+                rootStatsLogger.getOpStatsLogger("grpc_latency_micros")
+            );
+        } else {
+            this.completedLatencyMicros = Optional.empty();
+        }
+    }
+
+    public void recordCallStarted() {
+        rpcStarted.inc();
+    }
+
+    public void recordServerHandled(Code code) {
+        rpcCompleted.inc();
+    }
+
+    public void recordStreamMessageSent() {
+        streamMessagesSent.inc();
+    }
+
+    public void recordStreamMessageReceived() {
+        streamMessagesReceived.inc();
+    }
+
+    public boolean shouldRecordLatency() {
+        return completedLatencyMicros.isPresent();
+    }
+
+    public void recordLatency(boolean success, long latencyMicros) {
+        completedLatencyMicros.ifPresent(latencyLogger -> {
+            if (success) {
+                latencyLogger.registerSuccessfulEvent(latencyMicros, TimeUnit.MICROSECONDS);
+            } else {
+                latencyLogger.registerFailedEvent(latencyMicros, TimeUnit.MICROSECONDS);
+            }
+        });
+    }
+
+    /**
+     * Knows how to produce {@link ServerStats} instances for individual methods.
+     */
+    static class Factory {
+
+        private final boolean includeLatencyHistograms;
+
+        Factory(boolean includeLatencyHistograms) {
+            this.includeLatencyHistograms = includeLatencyHistograms;
+        }
+
+        /** Creates a {@link ServerStats} for the supplied method. */
+        <ReqT, RespT> ServerStats createMetricsForMethod(MethodDescriptor<ReqT, RespT> methodDescriptor,
+                                                         StatsLogger statsLogger) {
+
+            String fullMethodName = methodDescriptor.getFullMethodName();
+            String serviceName = MethodDescriptor.extractFullServiceName(fullMethodName);
+            String methodName = fullMethodName.substring(serviceName.length() + 1);
+
+            MethodType type = methodDescriptor.getType();
+            return new ServerStats(
+                statsLogger.scope(methodName),
+                includeLatencyHistograms,
+                type == MethodType.CLIENT_STREAMING || type == MethodType.BIDI_STREAMING,
+                type == MethodType.SERVER_STREAMING || type == MethodType.BIDI_STREAMING);
+        }
+    }
+
+}
diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/package-info.java b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/package-info.java
new file mode 100644
index 0000000..a3e2d3e
--- /dev/null
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Collecting grpc related stats.
+ */
+package org.apache.bookkeeper.common.grpc.stats;
\ No newline at end of file
diff --git a/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/stats/ClientStatsTest.java b/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/stats/ClientStatsTest.java
new file mode 100644
index 0000000..d5cf2d3
--- /dev/null
+++ b/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/stats/ClientStatsTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.stats;
+
+import static org.junit.Assert.assertEquals;
+
+import io.grpc.MethodDescriptor;
+import io.grpc.Status;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.common.grpc.stats.ClientStats.Factory;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.test.TestStatsProvider;
+import org.apache.bookkeeper.test.TestStatsProvider.TestOpStatsLogger;
+import org.bookkeeper.tests.proto.rpc.PingPongServiceGrpc;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit test {@link ClientStats}.
+ */
+public class ClientStatsTest {
+
+    private Factory factoryWithHistograms;
+    private Factory factoryWithoutHistograms;
+    private TestStatsProvider statsProvider;
+
+    @Before
+    public void setup() {
+        this.statsProvider = new TestStatsProvider();
+        this.factoryWithHistograms = new Factory(true);
+        this.factoryWithoutHistograms = new Factory(false);
+    }
+
+    @Test
+    public void testClientStatsWithHistogram() {
+        testClientStats(factoryWithHistograms, true);
+    }
+
+    @Test
+    public void testClientStatsWithoutHistogram() {
+        testClientStats(factoryWithoutHistograms, false);
+    }
+
+    private void testClientStats(Factory clientStatsFactory,
+                                 boolean includeLatencyHistogram) {
+        // test unary method
+        MethodDescriptor<?, ?> unaryMethod = PingPongServiceGrpc.getPingPongMethod();
+        testClientStats(
+            clientStatsFactory,
+            includeLatencyHistogram,
+            unaryMethod,
+            "PingPong",
+            "unary",
+            1,
+            1,
+            0,
+            0
+        );
+        // test client streaming
+        MethodDescriptor<?, ?> clientStreamingMethod = PingPongServiceGrpc.getLotsOfPingsMethod();
+        testClientStats(
+            clientStatsFactory,
+            includeLatencyHistogram,
+            clientStreamingMethod,
+            "LotsOfPings",
+            "client_streaming",
+            1,
+            1,
+            1,
+            0
+        );
+        // test server streaming
+        MethodDescriptor<?, ?> serverStreamingMethod = PingPongServiceGrpc.getLotsOfPongsMethod();
+        testClientStats(
+            clientStatsFactory,
+            includeLatencyHistogram,
+            serverStreamingMethod,
+            "LotsOfPongs",
+            "server_streaming",
+            1,
+            1,
+            0,
+            2
+        );
+        // test server streaming
+        MethodDescriptor<?, ?> biStreamingMethod = PingPongServiceGrpc.getBidiPingPongMethod();
+        testClientStats(
+            clientStatsFactory,
+            includeLatencyHistogram,
+            biStreamingMethod,
+            "BidiPingPong",
+            "bidi_streaming",
+            1,
+            1,
+            1,
+            2
+        );
+    }
+
+    private void testClientStats(Factory clientStatsFactory,
+                                 boolean includeLatencyHistogram,
+                                 MethodDescriptor<?, ?> method,
+                                 String methodName,
+                                 String statsScope,
+                                 long expectedCallStarted,
+                                 long expectedCallCompleted,
+                                 long expectedStreamMsgsSent,
+                                 long expectedStreamMsgsReceived) {
+        StatsLogger statsLogger = statsProvider.getStatsLogger(statsScope);
+        ClientStats unaryStats = clientStatsFactory.createMetricsForMethod(
+            method,
+            statsLogger
+        );
+        unaryStats.recordCallStarted();
+        assertEquals(
+            expectedCallStarted,
+            statsLogger.scope(methodName).getCounter("grpc_started").get().longValue());
+        unaryStats.recordClientHandled(Status.OK.getCode());
+        assertEquals(
+            expectedCallCompleted,
+            statsLogger.scope(methodName).getCounter("grpc_completed").get().longValue());
+        unaryStats.recordStreamMessageSent();
+        assertEquals(
+            expectedStreamMsgsSent,
+            statsLogger.scope(methodName).getCounter("grpc_msg_sent").get().longValue());
+        unaryStats.recordStreamMessageReceived();
+        unaryStats.recordStreamMessageReceived();
+        assertEquals(
+            expectedStreamMsgsReceived,
+            statsLogger.scope(methodName).getCounter("grpc_msg_received").get().longValue());
+        long latencyMicros = 12345L;
+        unaryStats.recordLatency(true, latencyMicros);
+        TestOpStatsLogger opStatsLogger =
+            (TestOpStatsLogger) statsLogger.scope(methodName).getOpStatsLogger("grpc_latency_micros");
+        if (includeLatencyHistogram) {
+            assertEquals(1, opStatsLogger.getSuccessCount());
+            assertEquals(
+                TimeUnit.MICROSECONDS.toNanos(latencyMicros),
+                (long) opStatsLogger.getSuccessAverage());
+        } else {
+            assertEquals(0, opStatsLogger.getSuccessCount());
+            assertEquals(0, (long) opStatsLogger.getSuccessAverage());
+        }
+    }
+
+}
diff --git a/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/stats/GrpcStatsIntegrationTest.java b/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/stats/GrpcStatsIntegrationTest.java
new file mode 100644
index 0000000..e34f30e
--- /dev/null
+++ b/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/stats/GrpcStatsIntegrationTest.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.stats;
+
+import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import io.grpc.Channel;
+import io.grpc.ClientInterceptors;
+import io.grpc.ManagedChannel;
+import io.grpc.Server;
+import io.grpc.ServerInterceptors;
+import io.grpc.ServerServiceDefinition;
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.inprocess.InProcessServerBuilder;
+import io.grpc.stub.StreamObserver;
+import io.grpc.util.MutableHandlerRegistry;
+import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.test.TestStatsProvider;
+import org.apache.bookkeeper.test.TestStatsProvider.TestOpStatsLogger;
+import org.apache.bookkeeper.test.TestStatsProvider.TestStatsLogger;
+import org.apache.bookkeeper.tests.rpc.PingPongService;
+import org.bookkeeper.tests.proto.rpc.PingPongServiceGrpc;
+import org.bookkeeper.tests.proto.rpc.PingPongServiceGrpc.PingPongServiceBlockingStub;
+import org.bookkeeper.tests.proto.rpc.PingPongServiceGrpc.PingPongServiceStub;
+import org.bookkeeper.tests.proto.rpc.PingRequest;
+import org.bookkeeper.tests.proto.rpc.PongResponse;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * End-to-end integration test on grpc stats.
+ */
+public class GrpcStatsIntegrationTest {
+
+    private static final int NUM_PONGS_PER_PING = 10;
+    private static final String SERVICE_NAME = "pingpong";
+
+    private Server server;
+    private PingPongService service;
+    private ManagedChannel channel;
+    private Channel monitoredChannel;
+    private PingPongServiceBlockingStub client;
+    private PingPongServiceStub clientNonBlocking;
+    private TestStatsProvider statsProvider;
+    private TestStatsLogger clientStatsLogger;
+    private TestStatsLogger serverStatsLogger;
+
+
+    @Before
+    public void setup() throws Exception {
+        statsProvider = new TestStatsProvider();
+        clientStatsLogger = statsProvider.getStatsLogger("client");
+        serverStatsLogger = statsProvider.getStatsLogger("server");
+        service = new PingPongService(NUM_PONGS_PER_PING);
+        ServerServiceDefinition monitoredService = ServerInterceptors.intercept(
+            service,
+            MonitoringServerInterceptor.create(serverStatsLogger, true)
+        );
+        MutableHandlerRegistry registry = new MutableHandlerRegistry();
+        server = InProcessServerBuilder
+            .forName(SERVICE_NAME)
+            .fallbackHandlerRegistry(registry)
+            .directExecutor()
+            .build()
+            .start();
+        registry.addService(monitoredService);
+
+        channel = InProcessChannelBuilder.forName(SERVICE_NAME)
+            .usePlaintext()
+            .build();
+        monitoredChannel = ClientInterceptors.intercept(
+            channel,
+            MonitoringClientInterceptor.create(clientStatsLogger, true)
+        );
+        client = PingPongServiceGrpc.newBlockingStub(monitoredChannel);
+        clientNonBlocking = PingPongServiceGrpc.newStub(monitoredChannel);
+    }
+
+    @After
+    public void teardown() {
+        if (null != channel) {
+            channel.shutdown();
+        }
+        if (null != server) {
+            server.shutdown();
+        }
+    }
+
+    private void assertStats(String methodName,
+                             long numCalls,
+                             long numClientMsgSent,
+                             long numClientMsgReceived,
+                             long numServerMsgSent,
+                             long numServerMsgReceived) {
+        // client stats
+        assertEquals(
+            numCalls,
+            clientStatsLogger.scope(methodName).getCounter("grpc_started").get().longValue()
+        );
+        assertEquals(
+            numCalls,
+            clientStatsLogger.scope(methodName).getCounter("grpc_completed").get().longValue()
+        );
+        assertEquals(
+            numClientMsgSent,
+            clientStatsLogger.scope(methodName).getCounter("grpc_msg_sent").get().longValue()
+        );
+        assertEquals(
+            numClientMsgReceived,
+            clientStatsLogger.scope(methodName).getCounter("grpc_msg_received").get().longValue()
+        );
+        TestOpStatsLogger opStatsLogger =
+            (TestOpStatsLogger) clientStatsLogger.scope(methodName).getOpStatsLogger("grpc_latency_micros");
+        assertEquals(
+            numCalls,
+            opStatsLogger.getSuccessCount()
+        );
+        // server stats
+        assertEquals(
+            numCalls,
+            serverStatsLogger.scope(methodName).getCounter("grpc_started").get().longValue()
+        );
+        assertEquals(
+            numCalls,
+            serverStatsLogger.scope(methodName).getCounter("grpc_completed").get().longValue()
+        );
+        assertEquals(
+            numServerMsgSent,
+            serverStatsLogger.scope(methodName).getCounter("grpc_msg_sent").get().longValue()
+        );
+        assertEquals(
+            numServerMsgReceived,
+            serverStatsLogger.scope(methodName).getCounter("grpc_msg_received").get().longValue()
+        );
+        opStatsLogger =
+            (TestOpStatsLogger) serverStatsLogger.scope(methodName).getOpStatsLogger("grpc_latency_micros");
+        assertEquals(
+            numCalls,
+            opStatsLogger.getSuccessCount()
+        );
+    }
+
+    @Test
+    public void testUnary() {
+        long sequence = ThreadLocalRandom.current().nextLong();
+        PingRequest request = PingRequest.newBuilder()
+            .setSequence(sequence)
+            .build();
+        PongResponse response = client.pingPong(request);
+        assertEquals(sequence, response.getLastSequence());
+        assertEquals(1, response.getNumPingReceived());
+        assertEquals(0, response.getSlotId());
+
+        // verify the stats
+        assertStats(
+            "PingPong",
+            1,
+            0,
+            0,
+            0,
+            0);
+    }
+
+    @Test
+    public void testServerStreaming() {
+        long sequence = ThreadLocalRandom.current().nextLong(100000);
+        PingRequest request = PingRequest.newBuilder()
+            .setSequence(sequence)
+            .build();
+        Iterator<PongResponse> respIter = client.lotsOfPongs(request);
+        int count = 0;
+        while (respIter.hasNext()) {
+            PongResponse resp = respIter.next();
+            assertEquals(sequence, resp.getLastSequence());
+            assertEquals(1, resp.getNumPingReceived());
+            assertEquals(count, resp.getSlotId());
+            ++count;
+        }
+
+        assertStats(
+            "LotsOfPongs",
+            1,
+            0,
+            NUM_PONGS_PER_PING,
+            NUM_PONGS_PER_PING,
+            0);
+    }
+
+    @Test
+    public void testClientStreaming() throws Exception {
+        final int numPings = 100;
+        final long sequence = ThreadLocalRandom.current().nextLong(100000);
+        final CompletableFuture<Void> respFuture = new CompletableFuture<>();
+        final LinkedBlockingQueue<PongResponse> respQueue = new LinkedBlockingQueue<>();
+        StreamObserver<PingRequest> pinger = clientNonBlocking.lotsOfPings(new StreamObserver<PongResponse>() {
+            @Override
+            public void onNext(PongResponse resp) {
+                respQueue.offer(resp);
+            }
+
+            @Override
+            public void onError(Throwable t) {
+                respFuture.completeExceptionally(t);
+            }
+
+            @Override
+            public void onCompleted() {
+                FutureUtils.complete(respFuture, null);
+            }
+        });
+
+        for (int i = 0; i < numPings; i++) {
+            PingRequest request = PingRequest.newBuilder()
+                .setSequence(sequence + i)
+                .build();
+            pinger.onNext(request);
+        }
+        pinger.onCompleted();
+
+        // wait for response to be received.
+        result(respFuture);
+
+        assertEquals(1, respQueue.size());
+
+        PongResponse resp = respQueue.take();
+        assertEquals(sequence + numPings - 1, resp.getLastSequence());
+        assertEquals(numPings, resp.getNumPingReceived());
+        assertEquals(0, resp.getSlotId());
+
+        assertStats(
+            "LotsOfPings",
+            1,
+            numPings,
+            0,
+            0,
+            numPings
+        );
+    }
+
+    @Test
+    public void testBidiStreaming() throws Exception {
+        final int numPings = 100;
+
+        final CompletableFuture<Void> respFuture = new CompletableFuture<>();
+        final LinkedBlockingQueue<PongResponse> respQueue = new LinkedBlockingQueue<>();
+        StreamObserver<PingRequest> pinger = clientNonBlocking.bidiPingPong(new StreamObserver<PongResponse>() {
+            @Override
+            public void onNext(PongResponse resp) {
+                respQueue.offer(resp);
+            }
+
+            @Override
+            public void onError(Throwable t) {
+                respFuture.completeExceptionally(t);
+            }
+
+            @Override
+            public void onCompleted() {
+                FutureUtils.complete(respFuture, null);
+            }
+        });
+
+        final LinkedBlockingQueue<PingRequest> reqQueue = new LinkedBlockingQueue<>();
+        for (int i = 0; i < numPings; i++) {
+            final long sequence = ThreadLocalRandom.current().nextLong(100000);
+            PingRequest request = PingRequest.newBuilder()
+                .setSequence(sequence)
+                .build();
+            reqQueue.put(request);
+            pinger.onNext(request);
+        }
+        pinger.onCompleted();
+
+        // wait for response to be received
+        result(respFuture);
+
+        assertEquals(numPings, respQueue.size());
+
+        int count = 0;
+        for (PingRequest request : reqQueue) {
+            PongResponse response = respQueue.take();
+
+            assertEquals(request.getSequence(), response.getLastSequence());
+            assertEquals(++count, response.getNumPingReceived());
+            assertEquals(0, response.getSlotId());
+        }
+        assertNull(respQueue.poll());
+        assertEquals(numPings, count);
+
+        assertStats(
+            "BidiPingPong",
+            1,
+            numPings,
+            numPings,
+            numPings,
+            numPings
+        );
+    }
+
+}
diff --git a/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/stats/ServerStatsTest.java b/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/stats/ServerStatsTest.java
new file mode 100644
index 0000000..8300aa6
--- /dev/null
+++ b/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/stats/ServerStatsTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.stats;
+
+import static org.junit.Assert.assertEquals;
+
+import io.grpc.MethodDescriptor;
+import io.grpc.Status;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.common.grpc.stats.ServerStats.Factory;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.test.TestStatsProvider;
+import org.apache.bookkeeper.test.TestStatsProvider.TestOpStatsLogger;
+import org.bookkeeper.tests.proto.rpc.PingPongServiceGrpc;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit test {@link ServerStats}.
+ */
+public class ServerStatsTest {
+
+    private Factory factoryWithHistograms;
+    private Factory factoryWithoutHistograms;
+    private TestStatsProvider statsProvider;
+
+    @Before
+    public void setup() {
+        this.statsProvider = new TestStatsProvider();
+        this.factoryWithHistograms = new Factory(true);
+        this.factoryWithoutHistograms = new Factory(false);
+    }
+
+    @Test
+    public void testServerStatsWithHistogram() {
+        testServerStats(factoryWithHistograms, true);
+    }
+
+    @Test
+    public void testServerStatsWithoutHistogram() {
+        testServerStats(factoryWithoutHistograms, false);
+    }
+
+    private void testServerStats(Factory clientStatsFactory,
+                                 boolean includeLatencyHistogram) {
+        // test unary method
+        MethodDescriptor<?, ?> unaryMethod = PingPongServiceGrpc.getPingPongMethod();
+        testServerStats(
+            clientStatsFactory,
+            includeLatencyHistogram,
+            unaryMethod,
+            "PingPong",
+            "unary",
+            1,
+            1,
+            0,
+            0
+        );
+        // test client streaming
+        MethodDescriptor<?, ?> clientStreamingMethod = PingPongServiceGrpc.getLotsOfPingsMethod();
+        testServerStats(
+            clientStatsFactory,
+            includeLatencyHistogram,
+            clientStreamingMethod,
+            "LotsOfPings",
+            "client_streaming",
+            1,
+            1,
+            0,
+            2
+        );
+        // test server streaming
+        MethodDescriptor<?, ?> serverStreamingMethod = PingPongServiceGrpc.getLotsOfPongsMethod();
+        testServerStats(
+            clientStatsFactory,
+            includeLatencyHistogram,
+            serverStreamingMethod,
+            "LotsOfPongs",
+            "server_streaming",
+            1,
+            1,
+            1,
+            0
+        );
+        // test server streaming
+        MethodDescriptor<?, ?> biStreamingMethod = PingPongServiceGrpc.getBidiPingPongMethod();
+        testServerStats(
+            clientStatsFactory,
+            includeLatencyHistogram,
+            biStreamingMethod,
+            "BidiPingPong",
+            "bidi_streaming",
+            1,
+            1,
+            1,
+            2
+        );
+    }
+
+    private void testServerStats(Factory clientStatsFactory,
+                                 boolean includeLatencyHistogram,
+                                 MethodDescriptor<?, ?> method,
+                                 String methodName,
+                                 String statsScope,
+                                 long expectedCallStarted,
+                                 long expectedCallCompleted,
+                                 long expectedStreamMsgsSent,
+                                 long expectedStreamMsgsReceived) {
+        StatsLogger statsLogger = statsProvider.getStatsLogger(statsScope);
+        ServerStats unaryStats = clientStatsFactory.createMetricsForMethod(
+            method,
+            statsLogger
+        );
+        unaryStats.recordCallStarted();
+        assertEquals(
+            expectedCallStarted,
+            statsLogger.scope(methodName).getCounter("grpc_started").get().longValue());
+        unaryStats.recordServerHandled(Status.OK.getCode());
+        assertEquals(
+            expectedCallCompleted,
+            statsLogger.scope(methodName).getCounter("grpc_completed").get().longValue());
+        unaryStats.recordStreamMessageSent();
+        assertEquals(
+            expectedStreamMsgsSent,
+            statsLogger.scope(methodName).getCounter("grpc_msg_sent").get().longValue());
+        unaryStats.recordStreamMessageReceived();
+        unaryStats.recordStreamMessageReceived();
+        assertEquals(
+            expectedStreamMsgsReceived,
+            statsLogger.scope(methodName).getCounter("grpc_msg_received").get().longValue());
+        long latencyMicros = 12345L;
+        unaryStats.recordLatency(true, latencyMicros);
+        TestOpStatsLogger opStatsLogger =
+            (TestOpStatsLogger) statsLogger.scope(methodName).getOpStatsLogger("grpc_latency_micros");
+        if (includeLatencyHistogram) {
+            assertEquals(1, opStatsLogger.getSuccessCount());
+            assertEquals(
+                TimeUnit.MICROSECONDS.toNanos(latencyMicros),
+                (long) opStatsLogger.getSuccessAverage());
+        } else {
+            assertEquals(0, opStatsLogger.getSuccessCount());
+            assertEquals(0, (long) opStatsLogger.getSuccessAverage());
+        }
+    }
+
+}
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/grpc/GrpcServer.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/grpc/GrpcServer.java
index 7a5f163..1ef2642 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/grpc/GrpcServer.java
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/grpc/GrpcServer.java
@@ -18,12 +18,14 @@ import com.google.common.annotations.VisibleForTesting;
 import io.grpc.HandlerRegistry;
 import io.grpc.Server;
 import io.grpc.ServerBuilder;
+import io.grpc.ServerInterceptors;
 import io.grpc.ServerServiceDefinition;
 import io.grpc.inprocess.InProcessServerBuilder;
 import java.io.IOException;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.common.component.AbstractLifecycleComponent;
 import org.apache.bookkeeper.common.grpc.proxy.ProxyHandlerRegistry;
+import org.apache.bookkeeper.common.grpc.stats.MonitoringServerInterceptor;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.stream.proto.common.Endpoint;
 import org.apache.bookkeeper.stream.server.conf.StorageServerConfiguration;
@@ -75,13 +77,23 @@ public class GrpcServer extends AbstractLifecycleComponent<StorageServerConfigur
             }
             this.grpcServer = serverBuilder.build();
         } else {
+            MonitoringServerInterceptor monitoringInterceptor =
+                MonitoringServerInterceptor.create(statsLogger.scope("services"), true);
             ProxyHandlerRegistry.Builder proxyRegistryBuilder = ProxyHandlerRegistry.newBuilder()
                 .setChannelFinder(storageContainerStore);
             for (ServerServiceDefinition definition : GrpcServices.create(null)) {
-                proxyRegistryBuilder = proxyRegistryBuilder.addService(definition);
+                ServerServiceDefinition monitoredService = ServerInterceptors.intercept(
+                    definition,
+                    monitoringInterceptor
+                );
+                proxyRegistryBuilder = proxyRegistryBuilder.addService(monitoredService);
             }
+            ServerServiceDefinition locationService = ServerInterceptors.intercept(
+                new GrpcStorageContainerService(storageContainerStore),
+                monitoringInterceptor
+            );
             this.grpcServer = ServerBuilder.forPort(this.myEndpoint.getPort())
-                .addService(new GrpcStorageContainerService(storageContainerStore))
+                .addService(locationService)
                 .fallbackHandlerRegistry(proxyRegistryBuilder.build())
                 .build();
         }