You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/10/03 15:30:47 UTC
[bookkeeper] branch master updated: [TABLE SERVICE] [STATS] enable
stats for grpc calls on both client and server
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 97c6656 [TABLE SERVICE] [STATS] enable stats for grpc calls on both client and server
97c6656 is described below
commit 97c6656a9600d20763202ee6a7946383da1bd712
Author: Sijie Guo <gu...@gmail.com>
AuthorDate: Wed Oct 3 08:30:42 2018 -0700
[TABLE SERVICE] [STATS] enable stats for grpc calls on both client and server
Descriptions of the changes in this PR:
*Motivation*
We need visibility on both client and server on table service.
*Changes*
- introduce grpc monitoring interceptors for both client and server
- enable monitoring interceptors on both client and server
Author: Sijie Guo <si...@apache.org>
Reviewers: Enrico Olivelli <eo...@gmail.com>, Jia Zhai <None>
This closes #1731 from sijie/grpc_stats
---
bookkeeper-benchmark/pom.xml | 11 +-
.../apache/bookkeeper/test/TestStatsProvider.java | 6 +-
metadata-drivers/etcd/pom.xml | 7 +
.../clients/config/StorageClientSettings.java | 8 +
.../clients/impl/channel/StorageServerChannel.java | 24 +-
stream/common/pom.xml | 7 +
.../bookkeeper/common/grpc/stats/ClientStats.java | 126 ++++++++
.../common/grpc/stats/MonitoringClientCall.java | 52 ++++
.../grpc/stats/MonitoringClientCallListener.java | 69 +++++
.../grpc/stats/MonitoringClientInterceptor.java | 84 ++++++
.../common/grpc/stats/MonitoringServerCall.java | 59 ++++
.../grpc/stats/MonitoringServerCallListener.java | 54 ++++
.../grpc/stats/MonitoringServerInterceptor.java | 88 ++++++
.../bookkeeper/common/grpc/stats/ServerStats.java | 126 ++++++++
.../bookkeeper/common/grpc/stats/package-info.java | 23 ++
.../common/grpc/stats/ClientStatsTest.java | 163 +++++++++++
.../grpc/stats/GrpcStatsIntegrationTest.java | 324 +++++++++++++++++++++
.../common/grpc/stats/ServerStatsTest.java | 163 +++++++++++
.../bookkeeper/stream/server/grpc/GrpcServer.java | 16 +-
19 files changed, 1397 insertions(+), 13 deletions(-)
diff --git a/bookkeeper-benchmark/pom.xml b/bookkeeper-benchmark/pom.xml
index 163c1bd..9b86140 100644
--- a/bookkeeper-benchmark/pom.xml
+++ b/bookkeeper-benchmark/pom.xml
@@ -52,13 +52,20 @@
<dependency>
<groupId>org.apache.bookkeeper</groupId>
<artifactId>bookkeeper-server</artifactId>
- <version>${project.parent.version}</version>
+ <version>${project.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.bookkeeper</groupId>
+ <artifactId>bookkeeper-common</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.bookkeeper</groupId>
<artifactId>bookkeeper-server</artifactId>
- <version>${project.parent.version}</version>
+ <version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestStatsProvider.java b/bookkeeper-common/src/test/java/org/apache/bookkeeper/test/TestStatsProvider.java
similarity index 97%
rename from bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestStatsProvider.java
rename to bookkeeper-common/src/test/java/org/apache/bookkeeper/test/TestStatsProvider.java
index 9b9fdb5..aca154e 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestStatsProvider.java
+++ b/bookkeeper-common/src/test/java/org/apache/bookkeeper/test/TestStatsProvider.java
@@ -1,5 +1,4 @@
/*
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,7 +15,6 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
*/
package org.apache.bookkeeper.test;
@@ -103,12 +101,12 @@ public class TestStatsProvider implements StatsProvider {
@Override
public void registerFailedEvent(long eventLatency, TimeUnit unit) {
- registerFailedValue(unit.convert(eventLatency, TimeUnit.NANOSECONDS));
+ registerFailedValue(TimeUnit.NANOSECONDS.convert(eventLatency, unit));
}
@Override
public void registerSuccessfulEvent(long eventLatency, TimeUnit unit) {
- registerSuccessfulValue(unit.convert(eventLatency, TimeUnit.NANOSECONDS));
+ registerSuccessfulValue(TimeUnit.NANOSECONDS.convert(eventLatency, unit));
}
@Override
diff --git a/metadata-drivers/etcd/pom.xml b/metadata-drivers/etcd/pom.xml
index 9e463bd..0d9e0dd 100644
--- a/metadata-drivers/etcd/pom.xml
+++ b/metadata-drivers/etcd/pom.xml
@@ -56,6 +56,13 @@
</dependency>
<dependency>
<groupId>org.apache.bookkeeper</groupId>
+ <artifactId>bookkeeper-common</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.bookkeeper</groupId>
<artifactId>bookkeeper-server</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/config/StorageClientSettings.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/config/StorageClientSettings.java
index e78ca73..b5541be 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/config/StorageClientSettings.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/config/StorageClientSettings.java
@@ -23,6 +23,7 @@ import org.apache.bookkeeper.clients.resolver.EndpointResolver;
import org.apache.bookkeeper.clients.utils.ClientConstants;
import org.apache.bookkeeper.common.net.ServiceURI;
import org.apache.bookkeeper.common.util.Backoff;
+import org.apache.bookkeeper.stats.StatsLogger;
import org.inferred.freebuilder.FreeBuilder;
/**
@@ -73,6 +74,13 @@ public interface StorageClientSettings {
Optional<String> clientName();
/**
+ * Configure a stats logger to collect stats exposed by this client.
+ *
+ * @return stats logger.
+ */
+ Optional<StatsLogger> statsLogger();
+
+ /**
* Configure a backoff policy for the client.
*
* <p>There are a few default backoff policies defined in {@link org.apache.bookkeeper.common.util.Backoff}.
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java
index e5206de..afacce1 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java
@@ -31,6 +31,7 @@ import org.apache.bookkeeper.clients.config.StorageClientSettings;
import org.apache.bookkeeper.clients.impl.container.StorageContainerClientInterceptor;
import org.apache.bookkeeper.clients.resolver.EndpointResolver;
import org.apache.bookkeeper.clients.utils.GrpcUtils;
+import org.apache.bookkeeper.common.grpc.stats.MonitoringClientInterceptor;
import org.apache.bookkeeper.stream.proto.common.Endpoint;
import org.apache.bookkeeper.stream.proto.kv.rpc.TableServiceGrpc;
import org.apache.bookkeeper.stream.proto.kv.rpc.TableServiceGrpc.TableServiceFutureStub;
@@ -49,11 +50,24 @@ import org.apache.bookkeeper.stream.proto.storage.StorageContainerServiceGrpc.St
public class StorageServerChannel implements AutoCloseable {
public static Function<Endpoint, StorageServerChannel> factory(StorageClientSettings settings) {
- return (endpoint) -> new StorageServerChannel(
- endpoint,
- Optional.empty(),
- settings.usePlaintext(),
- settings.endpointResolver());
+ return new Function<Endpoint, StorageServerChannel>() {
+
+ private final Optional<MonitoringClientInterceptor> interceptor =
+ settings.statsLogger().map(statsLogger ->
+ MonitoringClientInterceptor.create(statsLogger, true));
+
+ @Override
+ public StorageServerChannel apply(Endpoint endpoint) {
+ StorageServerChannel channel = new StorageServerChannel(
+ endpoint,
+ Optional.empty(),
+ settings.usePlaintext(),
+ settings.endpointResolver());
+ return interceptor
+ .map(interceptor -> channel.intercept(interceptor))
+ .orElse(channel);
+ }
+ };
}
private final Optional<String> token;
diff --git a/stream/common/pom.xml b/stream/common/pom.xml
index 40c8b07..c5d0339 100644
--- a/stream/common/pom.xml
+++ b/stream/common/pom.xml
@@ -48,6 +48,13 @@
<optional>true</optional>
</dependency>
<dependency>
+ <groupId>org.apache.bookkeeper</groupId>
+ <artifactId>bookkeeper-common</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.bookkeeper.tests</groupId>
<artifactId>stream-storage-tests-common</artifactId>
<version>${project.version}</version>
diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/ClientStats.java b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/ClientStats.java
new file mode 100644
index 0000000..2b7ba21
--- /dev/null
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/ClientStats.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.stats;
+
+import io.grpc.MethodDescriptor;
+import io.grpc.MethodDescriptor.MethodType;
+import io.grpc.Status.Code;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+
+/**
+ * Client side monitoring for grpc services.
+ */
+class ClientStats {
+
+ private final Counter rpcStarted;
+ private final Counter rpcCompleted;
+ private final Counter streamMessagesReceived;
+ private final Counter streamMessagesSent;
+ private final Optional<OpStatsLogger> completedLatencyMicros;
+
+ private ClientStats(StatsLogger rootStatsLogger,
+ boolean includeLatencyHistograms,
+ boolean streamRequests,
+ boolean streamResponses) {
+ this.rpcStarted = rootStatsLogger.getCounter("grpc_started");
+ this.rpcCompleted = rootStatsLogger.getCounter("grpc_completed");
+ if (streamResponses) {
+ this.streamMessagesReceived = rootStatsLogger.getCounter("grpc_msg_received");
+ } else {
+ this.streamMessagesReceived = NullStatsLogger.INSTANCE.getCounter("grpc_msg_received");
+ }
+ if (streamRequests) {
+ this.streamMessagesSent = rootStatsLogger.getCounter("grpc_msg_sent");
+ } else {
+ this.streamMessagesSent = NullStatsLogger.INSTANCE.getCounter("grpc_msg_sent");
+ }
+ if (includeLatencyHistograms) {
+ this.completedLatencyMicros = Optional.of(
+ rootStatsLogger.getOpStatsLogger("grpc_latency_micros")
+ );
+ } else {
+ this.completedLatencyMicros = Optional.empty();
+ }
+ }
+
+ public void recordCallStarted() {
+ rpcStarted.inc();
+ }
+
+ public void recordClientHandled(Code code) {
+ rpcCompleted.inc();
+ }
+
+ public void recordStreamMessageSent() {
+ streamMessagesSent.inc();
+ }
+
+ public void recordStreamMessageReceived() {
+ streamMessagesReceived.inc();
+ }
+
+ public boolean shouldRecordLatency() {
+ return completedLatencyMicros.isPresent();
+ }
+
+ public void recordLatency(boolean success, long latencyMicros) {
+ completedLatencyMicros.ifPresent(latencyLogger -> {
+ if (success) {
+ latencyLogger.registerSuccessfulEvent(latencyMicros, TimeUnit.MICROSECONDS);
+ } else {
+ latencyLogger.registerFailedEvent(latencyMicros, TimeUnit.MICROSECONDS);
+ }
+ });
+ }
+
+ /**
+ * Knows how to produce {@link ClientStats} instances for individual methods.
+ */
+ static class Factory {
+
+ private final boolean includeLatencyHistograms;
+
+ Factory(boolean includeLatencyHistograms) {
+ this.includeLatencyHistograms = includeLatencyHistograms;
+ }
+
+ /** Creates a {@link ClientStats} for the supplied method. */
+ <ReqT, RespT> ClientStats createMetricsForMethod(MethodDescriptor<ReqT, RespT> methodDescriptor,
+ StatsLogger statsLogger) {
+
+ String fullMethodName = methodDescriptor.getFullMethodName();
+ String serviceName = MethodDescriptor.extractFullServiceName(fullMethodName);
+ String methodName = fullMethodName.substring(serviceName.length() + 1);
+
+ MethodType type = methodDescriptor.getType();
+ return new ClientStats(
+ statsLogger.scope(methodName),
+ includeLatencyHistograms,
+ type == MethodType.CLIENT_STREAMING || type == MethodType.BIDI_STREAMING,
+ type == MethodType.SERVER_STREAMING || type == MethodType.BIDI_STREAMING);
+ }
+ }
+
+}
diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/MonitoringClientCall.java b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/MonitoringClientCall.java
new file mode 100644
index 0000000..763ca98
--- /dev/null
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/MonitoringClientCall.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.stats;
+
+import io.grpc.ClientCall;
+import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
+import io.grpc.Metadata;
+
+/**
+ * A {@link SimpleForwardingClientCall} which increments counters for rpc calls.
+ */
+class MonitoringClientCall<ReqT, RespT> extends SimpleForwardingClientCall<ReqT, RespT> {
+
+ private final ClientStats stats;
+
+ MonitoringClientCall(ClientCall<ReqT, RespT> delegate,
+ ClientStats stats) {
+ super(delegate);
+ this.stats = stats;
+ }
+
+ @Override
+ public void start(Listener<RespT> responseListener, Metadata headers) {
+ stats.recordCallStarted();
+ super.start(new MonitoringClientCallListener<>(
+ responseListener, stats
+ ), headers);
+ }
+
+ @Override
+ public void sendMessage(ReqT message) {
+ stats.recordStreamMessageSent();
+ super.sendMessage(message);
+ }
+}
diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/MonitoringClientCallListener.java b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/MonitoringClientCallListener.java
new file mode 100644
index 0000000..651439e
--- /dev/null
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/MonitoringClientCallListener.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.stats;
+
+import io.grpc.ClientCall.Listener;
+import io.grpc.ForwardingClientCallListener;
+import io.grpc.Metadata;
+import io.grpc.Status;
+import org.apache.bookkeeper.common.util.MathUtils;
+
+/**
+ * A {@link ForwardingClientCallListener} that monitors stats on grpc clients.
+ */
+class MonitoringClientCallListener<RespT> extends ForwardingClientCallListener<RespT> {
+
+ private final Listener<RespT> delegate;
+ private final ClientStats stats;
+ private final long startNanos;
+
+ MonitoringClientCallListener(Listener<RespT> delegate,
+ ClientStats stats) {
+ this.delegate = delegate;
+ this.stats = stats;
+ this.startNanos = MathUtils.nowInNano();
+ }
+
+ @Override
+ protected Listener<RespT> delegate() {
+ return delegate;
+ }
+
+ @Override
+ public void onMessage(RespT message) {
+ stats.recordStreamMessageReceived();
+ super.onMessage(message);
+ }
+
+ @Override
+ public void onClose(Status status, Metadata trailers) {
+ stats.recordClientHandled(status.getCode());
+ if (stats.shouldRecordLatency()) {
+ long latencyMicros = MathUtils.elapsedMicroSec(startNanos);
+ stats.recordLatency(Status.OK == status, latencyMicros);
+ }
+ super.onClose(status, trailers);
+ }
+
+ @Override
+ public String toString() {
+ return delegate.toString();
+ }
+}
diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/MonitoringClientInterceptor.java b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/MonitoringClientInterceptor.java
new file mode 100644
index 0000000..868f249
--- /dev/null
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/MonitoringClientInterceptor.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.stats;
+
+import io.grpc.CallOptions;
+import io.grpc.Channel;
+import io.grpc.ClientCall;
+import io.grpc.ClientInterceptor;
+import io.grpc.MethodDescriptor;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.bookkeeper.common.grpc.stats.ClientStats.Factory;
+import org.apache.bookkeeper.stats.StatsLogger;
+
+/**
+ * A {@link ClientInterceptor} that sends stats about grpc calls to stats logger.
+ */
+public class MonitoringClientInterceptor implements ClientInterceptor {
+
+ /**
+ * Create a monitoring client interceptor with provided stats logger and configuration.
+ *
+ * @param statsLogger stats logger to collect grpc stats
+ * @param includeLatencyHistograms flag indicates whether to include latency histograms or not
+ * @return a monitoring client interceptor
+ */
+ public static MonitoringClientInterceptor create(StatsLogger statsLogger,
+ boolean includeLatencyHistograms) {
+ return new MonitoringClientInterceptor(
+ new Factory(includeLatencyHistograms), statsLogger);
+ }
+
+ private final Factory statsFactory;
+ private final StatsLogger statsLogger;
+ private final ConcurrentMap<String, ClientStats> methods;
+
+ private MonitoringClientInterceptor(Factory statsFactory,
+ StatsLogger statsLogger) {
+ this.statsFactory = statsFactory;
+ this.statsLogger = statsLogger;
+ this.methods = new ConcurrentHashMap<>();
+ }
+
+ private ClientStats getMethodStats(MethodDescriptor<?, ?> method) {
+ ClientStats stats = methods.get(method.getFullMethodName());
+ if (null != stats) {
+ return stats;
+ }
+ ClientStats newStats = statsFactory.createMetricsForMethod(method, statsLogger);
+ ClientStats oldStats = methods.putIfAbsent(method.getFullMethodName(), newStats);
+ if (null != oldStats) {
+ return oldStats;
+ } else {
+ return newStats;
+ }
+ }
+
+ @Override
+ public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
+ MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
+ ClientStats stats = getMethodStats(method);
+ return new MonitoringClientCall<>(
+ next.newCall(method, callOptions),
+ stats
+ );
+ }
+}
diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/MonitoringServerCall.java b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/MonitoringServerCall.java
new file mode 100644
index 0000000..abb0969
--- /dev/null
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/MonitoringServerCall.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.stats;
+
+import io.grpc.ForwardingServerCall.SimpleForwardingServerCall;
+import io.grpc.Metadata;
+import io.grpc.ServerCall;
+import io.grpc.Status;
+import org.apache.bookkeeper.common.util.MathUtils;
+
+/**
+ * A {@link SimpleForwardingServerCall} which increments counters for rpc calls.
+ */
+class MonitoringServerCall<ReqT, RespT> extends SimpleForwardingServerCall<ReqT, RespT> {
+
+ private final ServerStats stats;
+ private final long startNanos;
+
+ MonitoringServerCall(ServerCall<ReqT, RespT> delegate,
+ ServerStats stats) {
+ super(delegate);
+ this.stats = stats;
+ this.startNanos = MathUtils.nowInNano();
+ stats.recordCallStarted();
+ }
+
+ @Override
+ public void sendMessage(RespT message) {
+ stats.recordStreamMessageSent();
+ super.sendMessage(message);
+ }
+
+ @Override
+ public void close(Status status, Metadata trailers) {
+ stats.recordServerHandled(status.getCode());
+ if (stats.shouldRecordLatency()) {
+ long latencyMicros = MathUtils.elapsedMicroSec(startNanos);
+ stats.recordLatency(Status.OK == status, latencyMicros);
+ }
+ super.close(status, trailers);
+ }
+}
diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/MonitoringServerCallListener.java b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/MonitoringServerCallListener.java
new file mode 100644
index 0000000..6eef140
--- /dev/null
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/MonitoringServerCallListener.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.stats;
+
+import io.grpc.ForwardingServerCallListener;
+import io.grpc.ServerCall.Listener;
+
+/**
+ * A {@link ForwardingServerCallListener} that monitors stats on grpc clients.
+ */
+class MonitoringServerCallListener<RespT> extends ForwardingServerCallListener<RespT> {
+
+ private final Listener<RespT> delegate;
+ private final ServerStats stats;
+
+ MonitoringServerCallListener(Listener<RespT> delegate,
+ ServerStats stats) {
+ this.delegate = delegate;
+ this.stats = stats;
+ }
+
+ @Override
+ protected Listener<RespT> delegate() {
+ return delegate;
+ }
+
+ @Override
+ public void onMessage(RespT message) {
+ stats.recordStreamMessageReceived();
+ super.onMessage(message);
+ }
+
+ @Override
+ public String toString() {
+ return delegate.toString();
+ }
+}
diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/MonitoringServerInterceptor.java b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/MonitoringServerInterceptor.java
new file mode 100644
index 0000000..1c3ed25
--- /dev/null
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/MonitoringServerInterceptor.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.stats;
+
+import io.grpc.Metadata;
+import io.grpc.MethodDescriptor;
+import io.grpc.ServerCall;
+import io.grpc.ServerCall.Listener;
+import io.grpc.ServerCallHandler;
+import io.grpc.ServerInterceptor;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.bookkeeper.common.grpc.stats.ServerStats.Factory;
+import org.apache.bookkeeper.stats.StatsLogger;
+
+/**
+ * A {@link ServerInterceptor} that sends stats about grpc calls to stats logger.
+ */
+public class MonitoringServerInterceptor implements ServerInterceptor {
+
+ /**
+ * Create a monitoring client interceptor with provided stats logger and configuration.
+ *
+ * @param statsLogger stats logger to collect grpc stats
+ * @param includeLatencyHistograms flag indicates whether to include latency histograms or not
+ * @return a monitoring client interceptor
+ */
+ public static MonitoringServerInterceptor create(StatsLogger statsLogger,
+ boolean includeLatencyHistograms) {
+ return new MonitoringServerInterceptor(
+ new Factory(includeLatencyHistograms), statsLogger);
+ }
+
+ private final Factory statsFactory;
+ private final StatsLogger statsLogger;
+ private final ConcurrentMap<String, ServerStats> methods;
+
+ private MonitoringServerInterceptor(Factory statsFactory,
+ StatsLogger statsLogger) {
+ this.statsFactory = statsFactory;
+ this.statsLogger = statsLogger;
+ this.methods = new ConcurrentHashMap<>();
+ }
+
+ private ServerStats getMethodStats(MethodDescriptor<?, ?> method) {
+ ServerStats stats = methods.get(method.getFullMethodName());
+ if (null != stats) {
+ return stats;
+ }
+ ServerStats newStats = statsFactory.createMetricsForMethod(method, statsLogger);
+ ServerStats oldStats = methods.putIfAbsent(method.getFullMethodName(), newStats);
+ if (null != oldStats) {
+ return oldStats;
+ } else {
+ return newStats;
+ }
+ }
+
+
+ @Override
+ public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
+ Metadata headers,
+ ServerCallHandler<ReqT, RespT> next) {
+ MethodDescriptor<ReqT, RespT> method = call.getMethodDescriptor();
+ ServerStats stats = getMethodStats(method);
+ ServerCall<ReqT, RespT> monitoringCall = new MonitoringServerCall<>(call, stats);
+ return new MonitoringServerCallListener<>(
+ next.startCall(monitoringCall, headers), stats
+ );
+ }
+}
diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/ServerStats.java b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/ServerStats.java
new file mode 100644
index 0000000..eae8348
--- /dev/null
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/ServerStats.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.stats;
+
+import io.grpc.MethodDescriptor;
+import io.grpc.MethodDescriptor.MethodType;
+import io.grpc.Status.Code;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+
+/**
+ * Client side monitoring for grpc services.
+ */
+class ServerStats {
+
+ private final Counter rpcStarted;
+ private final Counter rpcCompleted;
+ private final Counter streamMessagesReceived;
+ private final Counter streamMessagesSent;
+ private final Optional<OpStatsLogger> completedLatencyMicros;
+
+ private ServerStats(StatsLogger rootStatsLogger,
+ boolean includeLatencyHistograms,
+ boolean streamRequests,
+ boolean streamResponses) {
+ this.rpcStarted = rootStatsLogger.getCounter("grpc_started");
+ this.rpcCompleted = rootStatsLogger.getCounter("grpc_completed");
+ if (streamRequests) {
+ this.streamMessagesReceived = rootStatsLogger.getCounter("grpc_msg_received");
+ } else {
+ this.streamMessagesReceived = NullStatsLogger.INSTANCE.getCounter("grpc_msg_received");
+ }
+ if (streamResponses) {
+ this.streamMessagesSent = rootStatsLogger.getCounter("grpc_msg_sent");
+ } else {
+ this.streamMessagesSent = NullStatsLogger.INSTANCE.getCounter("grpc_msg_sent");
+ }
+ if (includeLatencyHistograms) {
+ this.completedLatencyMicros = Optional.of(
+ rootStatsLogger.getOpStatsLogger("grpc_latency_micros")
+ );
+ } else {
+ this.completedLatencyMicros = Optional.empty();
+ }
+ }
+
+ public void recordCallStarted() {
+ rpcStarted.inc();
+ }
+
+ public void recordServerHandled(Code code) {
+ rpcCompleted.inc();
+ }
+
+ public void recordStreamMessageSent() {
+ streamMessagesSent.inc();
+ }
+
+ public void recordStreamMessageReceived() {
+ streamMessagesReceived.inc();
+ }
+
+ public boolean shouldRecordLatency() {
+ return completedLatencyMicros.isPresent();
+ }
+
+ public void recordLatency(boolean success, long latencyMicros) {
+ completedLatencyMicros.ifPresent(latencyLogger -> {
+ if (success) {
+ latencyLogger.registerSuccessfulEvent(latencyMicros, TimeUnit.MICROSECONDS);
+ } else {
+ latencyLogger.registerFailedEvent(latencyMicros, TimeUnit.MICROSECONDS);
+ }
+ });
+ }
+
+ /**
+ * Knows how to produce {@link ServerStats} instances for individual methods.
+ */
+ static class Factory {
+
+ private final boolean includeLatencyHistograms;
+
+ Factory(boolean includeLatencyHistograms) {
+ this.includeLatencyHistograms = includeLatencyHistograms;
+ }
+
+ /** Creates a {@link ServerStats} for the supplied method. */
+ <ReqT, RespT> ServerStats createMetricsForMethod(MethodDescriptor<ReqT, RespT> methodDescriptor,
+ StatsLogger statsLogger) {
+
+ String fullMethodName = methodDescriptor.getFullMethodName();
+ String serviceName = MethodDescriptor.extractFullServiceName(fullMethodName);
+ String methodName = fullMethodName.substring(serviceName.length() + 1);
+
+ MethodType type = methodDescriptor.getType();
+ return new ServerStats(
+ statsLogger.scope(methodName),
+ includeLatencyHistograms,
+ type == MethodType.CLIENT_STREAMING || type == MethodType.BIDI_STREAMING,
+ type == MethodType.SERVER_STREAMING || type == MethodType.BIDI_STREAMING);
+ }
+ }
+
+}
diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/package-info.java b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/package-info.java
new file mode 100644
index 0000000..a3e2d3e
--- /dev/null
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Collecting grpc related stats.
+ */
+package org.apache.bookkeeper.common.grpc.stats;
\ No newline at end of file
diff --git a/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/stats/ClientStatsTest.java b/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/stats/ClientStatsTest.java
new file mode 100644
index 0000000..d5cf2d3
--- /dev/null
+++ b/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/stats/ClientStatsTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.stats;
+
+import static org.junit.Assert.assertEquals;
+
+import io.grpc.MethodDescriptor;
+import io.grpc.Status;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.common.grpc.stats.ClientStats.Factory;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.test.TestStatsProvider;
+import org.apache.bookkeeper.test.TestStatsProvider.TestOpStatsLogger;
+import org.bookkeeper.tests.proto.rpc.PingPongServiceGrpc;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit test {@link ClientStats}.
+ */
+public class ClientStatsTest {
+
+ private Factory factoryWithHistograms;
+ private Factory factoryWithoutHistograms;
+ private TestStatsProvider statsProvider;
+
+ @Before
+ public void setup() {
+ this.statsProvider = new TestStatsProvider();
+ this.factoryWithHistograms = new Factory(true);
+ this.factoryWithoutHistograms = new Factory(false);
+ }
+
+ @Test
+ public void testClientStatsWithHistogram() {
+ testClientStats(factoryWithHistograms, true);
+ }
+
+ @Test
+ public void testClientStatsWithoutHistogram() {
+ testClientStats(factoryWithoutHistograms, false);
+ }
+
+ private void testClientStats(Factory clientStatsFactory,
+ boolean includeLatencyHistogram) {
+ // test unary method
+ MethodDescriptor<?, ?> unaryMethod = PingPongServiceGrpc.getPingPongMethod();
+ testClientStats(
+ clientStatsFactory,
+ includeLatencyHistogram,
+ unaryMethod,
+ "PingPong",
+ "unary",
+ 1,
+ 1,
+ 0,
+ 0
+ );
+ // test client streaming
+ MethodDescriptor<?, ?> clientStreamingMethod = PingPongServiceGrpc.getLotsOfPingsMethod();
+ testClientStats(
+ clientStatsFactory,
+ includeLatencyHistogram,
+ clientStreamingMethod,
+ "LotsOfPings",
+ "client_streaming",
+ 1,
+ 1,
+ 1,
+ 0
+ );
+ // test server streaming
+ MethodDescriptor<?, ?> serverStreamingMethod = PingPongServiceGrpc.getLotsOfPongsMethod();
+ testClientStats(
+ clientStatsFactory,
+ includeLatencyHistogram,
+ serverStreamingMethod,
+ "LotsOfPongs",
+ "server_streaming",
+ 1,
+ 1,
+ 0,
+ 2
+ );
+ // test server streaming
+ MethodDescriptor<?, ?> biStreamingMethod = PingPongServiceGrpc.getBidiPingPongMethod();
+ testClientStats(
+ clientStatsFactory,
+ includeLatencyHistogram,
+ biStreamingMethod,
+ "BidiPingPong",
+ "bidi_streaming",
+ 1,
+ 1,
+ 1,
+ 2
+ );
+ }
+
+ private void testClientStats(Factory clientStatsFactory,
+ boolean includeLatencyHistogram,
+ MethodDescriptor<?, ?> method,
+ String methodName,
+ String statsScope,
+ long expectedCallStarted,
+ long expectedCallCompleted,
+ long expectedStreamMsgsSent,
+ long expectedStreamMsgsReceived) {
+ StatsLogger statsLogger = statsProvider.getStatsLogger(statsScope);
+ ClientStats unaryStats = clientStatsFactory.createMetricsForMethod(
+ method,
+ statsLogger
+ );
+ unaryStats.recordCallStarted();
+ assertEquals(
+ expectedCallStarted,
+ statsLogger.scope(methodName).getCounter("grpc_started").get().longValue());
+ unaryStats.recordClientHandled(Status.OK.getCode());
+ assertEquals(
+ expectedCallCompleted,
+ statsLogger.scope(methodName).getCounter("grpc_completed").get().longValue());
+ unaryStats.recordStreamMessageSent();
+ assertEquals(
+ expectedStreamMsgsSent,
+ statsLogger.scope(methodName).getCounter("grpc_msg_sent").get().longValue());
+ unaryStats.recordStreamMessageReceived();
+ unaryStats.recordStreamMessageReceived();
+ assertEquals(
+ expectedStreamMsgsReceived,
+ statsLogger.scope(methodName).getCounter("grpc_msg_received").get().longValue());
+ long latencyMicros = 12345L;
+ unaryStats.recordLatency(true, latencyMicros);
+ TestOpStatsLogger opStatsLogger =
+ (TestOpStatsLogger) statsLogger.scope(methodName).getOpStatsLogger("grpc_latency_micros");
+ if (includeLatencyHistogram) {
+ assertEquals(1, opStatsLogger.getSuccessCount());
+ assertEquals(
+ TimeUnit.MICROSECONDS.toNanos(latencyMicros),
+ (long) opStatsLogger.getSuccessAverage());
+ } else {
+ assertEquals(0, opStatsLogger.getSuccessCount());
+ assertEquals(0, (long) opStatsLogger.getSuccessAverage());
+ }
+ }
+
+}
diff --git a/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/stats/GrpcStatsIntegrationTest.java b/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/stats/GrpcStatsIntegrationTest.java
new file mode 100644
index 0000000..e34f30e
--- /dev/null
+++ b/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/stats/GrpcStatsIntegrationTest.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.stats;
+
+import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import io.grpc.Channel;
+import io.grpc.ClientInterceptors;
+import io.grpc.ManagedChannel;
+import io.grpc.Server;
+import io.grpc.ServerInterceptors;
+import io.grpc.ServerServiceDefinition;
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.inprocess.InProcessServerBuilder;
+import io.grpc.stub.StreamObserver;
+import io.grpc.util.MutableHandlerRegistry;
+import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.test.TestStatsProvider;
+import org.apache.bookkeeper.test.TestStatsProvider.TestOpStatsLogger;
+import org.apache.bookkeeper.test.TestStatsProvider.TestStatsLogger;
+import org.apache.bookkeeper.tests.rpc.PingPongService;
+import org.bookkeeper.tests.proto.rpc.PingPongServiceGrpc;
+import org.bookkeeper.tests.proto.rpc.PingPongServiceGrpc.PingPongServiceBlockingStub;
+import org.bookkeeper.tests.proto.rpc.PingPongServiceGrpc.PingPongServiceStub;
+import org.bookkeeper.tests.proto.rpc.PingRequest;
+import org.bookkeeper.tests.proto.rpc.PongResponse;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * End-to-end integration test on grpc stats.
+ */
+public class GrpcStatsIntegrationTest {
+
+ private static final int NUM_PONGS_PER_PING = 10;
+ private static final String SERVICE_NAME = "pingpong";
+
+ private Server server;
+ private PingPongService service;
+ private ManagedChannel channel;
+ private Channel monitoredChannel;
+ private PingPongServiceBlockingStub client;
+ private PingPongServiceStub clientNonBlocking;
+ private TestStatsProvider statsProvider;
+ private TestStatsLogger clientStatsLogger;
+ private TestStatsLogger serverStatsLogger;
+
+
+ @Before
+ public void setup() throws Exception {
+ statsProvider = new TestStatsProvider();
+ clientStatsLogger = statsProvider.getStatsLogger("client");
+ serverStatsLogger = statsProvider.getStatsLogger("server");
+ service = new PingPongService(NUM_PONGS_PER_PING);
+ ServerServiceDefinition monitoredService = ServerInterceptors.intercept(
+ service,
+ MonitoringServerInterceptor.create(serverStatsLogger, true)
+ );
+ MutableHandlerRegistry registry = new MutableHandlerRegistry();
+ server = InProcessServerBuilder
+ .forName(SERVICE_NAME)
+ .fallbackHandlerRegistry(registry)
+ .directExecutor()
+ .build()
+ .start();
+ registry.addService(monitoredService);
+
+ channel = InProcessChannelBuilder.forName(SERVICE_NAME)
+ .usePlaintext()
+ .build();
+ monitoredChannel = ClientInterceptors.intercept(
+ channel,
+ MonitoringClientInterceptor.create(clientStatsLogger, true)
+ );
+ client = PingPongServiceGrpc.newBlockingStub(monitoredChannel);
+ clientNonBlocking = PingPongServiceGrpc.newStub(monitoredChannel);
+ }
+
+ @After
+ public void teardown() {
+ if (null != channel) {
+ channel.shutdown();
+ }
+ if (null != server) {
+ server.shutdown();
+ }
+ }
+
+ private void assertStats(String methodName,
+ long numCalls,
+ long numClientMsgSent,
+ long numClientMsgReceived,
+ long numServerMsgSent,
+ long numServerMsgReceived) {
+ // client stats
+ assertEquals(
+ numCalls,
+ clientStatsLogger.scope(methodName).getCounter("grpc_started").get().longValue()
+ );
+ assertEquals(
+ numCalls,
+ clientStatsLogger.scope(methodName).getCounter("grpc_completed").get().longValue()
+ );
+ assertEquals(
+ numClientMsgSent,
+ clientStatsLogger.scope(methodName).getCounter("grpc_msg_sent").get().longValue()
+ );
+ assertEquals(
+ numClientMsgReceived,
+ clientStatsLogger.scope(methodName).getCounter("grpc_msg_received").get().longValue()
+ );
+ TestOpStatsLogger opStatsLogger =
+ (TestOpStatsLogger) clientStatsLogger.scope(methodName).getOpStatsLogger("grpc_latency_micros");
+ assertEquals(
+ numCalls,
+ opStatsLogger.getSuccessCount()
+ );
+ // server stats
+ assertEquals(
+ numCalls,
+ serverStatsLogger.scope(methodName).getCounter("grpc_started").get().longValue()
+ );
+ assertEquals(
+ numCalls,
+ serverStatsLogger.scope(methodName).getCounter("grpc_completed").get().longValue()
+ );
+ assertEquals(
+ numServerMsgSent,
+ serverStatsLogger.scope(methodName).getCounter("grpc_msg_sent").get().longValue()
+ );
+ assertEquals(
+ numServerMsgReceived,
+ serverStatsLogger.scope(methodName).getCounter("grpc_msg_received").get().longValue()
+ );
+ opStatsLogger =
+ (TestOpStatsLogger) serverStatsLogger.scope(methodName).getOpStatsLogger("grpc_latency_micros");
+ assertEquals(
+ numCalls,
+ opStatsLogger.getSuccessCount()
+ );
+ }
+
+ @Test
+ public void testUnary() {
+ long sequence = ThreadLocalRandom.current().nextLong();
+ PingRequest request = PingRequest.newBuilder()
+ .setSequence(sequence)
+ .build();
+ PongResponse response = client.pingPong(request);
+ assertEquals(sequence, response.getLastSequence());
+ assertEquals(1, response.getNumPingReceived());
+ assertEquals(0, response.getSlotId());
+
+ // verify the stats
+ assertStats(
+ "PingPong",
+ 1,
+ 0,
+ 0,
+ 0,
+ 0);
+ }
+
+ @Test
+ public void testServerStreaming() {
+ long sequence = ThreadLocalRandom.current().nextLong(100000);
+ PingRequest request = PingRequest.newBuilder()
+ .setSequence(sequence)
+ .build();
+ Iterator<PongResponse> respIter = client.lotsOfPongs(request);
+ int count = 0;
+ while (respIter.hasNext()) {
+ PongResponse resp = respIter.next();
+ assertEquals(sequence, resp.getLastSequence());
+ assertEquals(1, resp.getNumPingReceived());
+ assertEquals(count, resp.getSlotId());
+ ++count;
+ }
+
+ assertStats(
+ "LotsOfPongs",
+ 1,
+ 0,
+ NUM_PONGS_PER_PING,
+ NUM_PONGS_PER_PING,
+ 0);
+ }
+
+ @Test
+ public void testClientStreaming() throws Exception {
+ final int numPings = 100;
+ final long sequence = ThreadLocalRandom.current().nextLong(100000);
+ final CompletableFuture<Void> respFuture = new CompletableFuture<>();
+ final LinkedBlockingQueue<PongResponse> respQueue = new LinkedBlockingQueue<>();
+ StreamObserver<PingRequest> pinger = clientNonBlocking.lotsOfPings(new StreamObserver<PongResponse>() {
+ @Override
+ public void onNext(PongResponse resp) {
+ respQueue.offer(resp);
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ respFuture.completeExceptionally(t);
+ }
+
+ @Override
+ public void onCompleted() {
+ FutureUtils.complete(respFuture, null);
+ }
+ });
+
+ for (int i = 0; i < numPings; i++) {
+ PingRequest request = PingRequest.newBuilder()
+ .setSequence(sequence + i)
+ .build();
+ pinger.onNext(request);
+ }
+ pinger.onCompleted();
+
+ // wait for response to be received.
+ result(respFuture);
+
+ assertEquals(1, respQueue.size());
+
+ PongResponse resp = respQueue.take();
+ assertEquals(sequence + numPings - 1, resp.getLastSequence());
+ assertEquals(numPings, resp.getNumPingReceived());
+ assertEquals(0, resp.getSlotId());
+
+ assertStats(
+ "LotsOfPings",
+ 1,
+ numPings,
+ 0,
+ 0,
+ numPings
+ );
+ }
+
+ @Test
+ public void testBidiStreaming() throws Exception {
+ final int numPings = 100;
+
+ final CompletableFuture<Void> respFuture = new CompletableFuture<>();
+ final LinkedBlockingQueue<PongResponse> respQueue = new LinkedBlockingQueue<>();
+ StreamObserver<PingRequest> pinger = clientNonBlocking.bidiPingPong(new StreamObserver<PongResponse>() {
+ @Override
+ public void onNext(PongResponse resp) {
+ respQueue.offer(resp);
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ respFuture.completeExceptionally(t);
+ }
+
+ @Override
+ public void onCompleted() {
+ FutureUtils.complete(respFuture, null);
+ }
+ });
+
+ final LinkedBlockingQueue<PingRequest> reqQueue = new LinkedBlockingQueue<>();
+ for (int i = 0; i < numPings; i++) {
+ final long sequence = ThreadLocalRandom.current().nextLong(100000);
+ PingRequest request = PingRequest.newBuilder()
+ .setSequence(sequence)
+ .build();
+ reqQueue.put(request);
+ pinger.onNext(request);
+ }
+ pinger.onCompleted();
+
+ // wait for response to be received
+ result(respFuture);
+
+ assertEquals(numPings, respQueue.size());
+
+ int count = 0;
+ for (PingRequest request : reqQueue) {
+ PongResponse response = respQueue.take();
+
+ assertEquals(request.getSequence(), response.getLastSequence());
+ assertEquals(++count, response.getNumPingReceived());
+ assertEquals(0, response.getSlotId());
+ }
+ assertNull(respQueue.poll());
+ assertEquals(numPings, count);
+
+ assertStats(
+ "BidiPingPong",
+ 1,
+ numPings,
+ numPings,
+ numPings,
+ numPings
+ );
+ }
+
+}
diff --git a/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/stats/ServerStatsTest.java b/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/stats/ServerStatsTest.java
new file mode 100644
index 0000000..8300aa6
--- /dev/null
+++ b/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/stats/ServerStatsTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.stats;
+
+import static org.junit.Assert.assertEquals;
+
+import io.grpc.MethodDescriptor;
+import io.grpc.Status;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.common.grpc.stats.ServerStats.Factory;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.test.TestStatsProvider;
+import org.apache.bookkeeper.test.TestStatsProvider.TestOpStatsLogger;
+import org.bookkeeper.tests.proto.rpc.PingPongServiceGrpc;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit test {@link ServerStats}.
+ */
+public class ServerStatsTest {
+
+ private Factory factoryWithHistograms;
+ private Factory factoryWithoutHistograms;
+ private TestStatsProvider statsProvider;
+
+ @Before
+ public void setup() {
+ this.statsProvider = new TestStatsProvider();
+ this.factoryWithHistograms = new Factory(true);
+ this.factoryWithoutHistograms = new Factory(false);
+ }
+
+ @Test
+ public void testServerStatsWithHistogram() {
+ testServerStats(factoryWithHistograms, true);
+ }
+
+ @Test
+ public void testServerStatsWithoutHistogram() {
+ testServerStats(factoryWithoutHistograms, false);
+ }
+
+ private void testServerStats(Factory clientStatsFactory,
+ boolean includeLatencyHistogram) {
+ // test unary method
+ MethodDescriptor<?, ?> unaryMethod = PingPongServiceGrpc.getPingPongMethod();
+ testServerStats(
+ clientStatsFactory,
+ includeLatencyHistogram,
+ unaryMethod,
+ "PingPong",
+ "unary",
+ 1,
+ 1,
+ 0,
+ 0
+ );
+ // test client streaming
+ MethodDescriptor<?, ?> clientStreamingMethod = PingPongServiceGrpc.getLotsOfPingsMethod();
+ testServerStats(
+ clientStatsFactory,
+ includeLatencyHistogram,
+ clientStreamingMethod,
+ "LotsOfPings",
+ "client_streaming",
+ 1,
+ 1,
+ 0,
+ 2
+ );
+ // test server streaming
+ MethodDescriptor<?, ?> serverStreamingMethod = PingPongServiceGrpc.getLotsOfPongsMethod();
+ testServerStats(
+ clientStatsFactory,
+ includeLatencyHistogram,
+ serverStreamingMethod,
+ "LotsOfPongs",
+ "server_streaming",
+ 1,
+ 1,
+ 1,
+ 0
+ );
+ // test server streaming
+ MethodDescriptor<?, ?> biStreamingMethod = PingPongServiceGrpc.getBidiPingPongMethod();
+ testServerStats(
+ clientStatsFactory,
+ includeLatencyHistogram,
+ biStreamingMethod,
+ "BidiPingPong",
+ "bidi_streaming",
+ 1,
+ 1,
+ 1,
+ 2
+ );
+ }
+
+ private void testServerStats(Factory clientStatsFactory,
+ boolean includeLatencyHistogram,
+ MethodDescriptor<?, ?> method,
+ String methodName,
+ String statsScope,
+ long expectedCallStarted,
+ long expectedCallCompleted,
+ long expectedStreamMsgsSent,
+ long expectedStreamMsgsReceived) {
+ StatsLogger statsLogger = statsProvider.getStatsLogger(statsScope);
+ ServerStats unaryStats = clientStatsFactory.createMetricsForMethod(
+ method,
+ statsLogger
+ );
+ unaryStats.recordCallStarted();
+ assertEquals(
+ expectedCallStarted,
+ statsLogger.scope(methodName).getCounter("grpc_started").get().longValue());
+ unaryStats.recordServerHandled(Status.OK.getCode());
+ assertEquals(
+ expectedCallCompleted,
+ statsLogger.scope(methodName).getCounter("grpc_completed").get().longValue());
+ unaryStats.recordStreamMessageSent();
+ assertEquals(
+ expectedStreamMsgsSent,
+ statsLogger.scope(methodName).getCounter("grpc_msg_sent").get().longValue());
+ unaryStats.recordStreamMessageReceived();
+ unaryStats.recordStreamMessageReceived();
+ assertEquals(
+ expectedStreamMsgsReceived,
+ statsLogger.scope(methodName).getCounter("grpc_msg_received").get().longValue());
+ long latencyMicros = 12345L;
+ unaryStats.recordLatency(true, latencyMicros);
+ TestOpStatsLogger opStatsLogger =
+ (TestOpStatsLogger) statsLogger.scope(methodName).getOpStatsLogger("grpc_latency_micros");
+ if (includeLatencyHistogram) {
+ assertEquals(1, opStatsLogger.getSuccessCount());
+ assertEquals(
+ TimeUnit.MICROSECONDS.toNanos(latencyMicros),
+ (long) opStatsLogger.getSuccessAverage());
+ } else {
+ assertEquals(0, opStatsLogger.getSuccessCount());
+ assertEquals(0, (long) opStatsLogger.getSuccessAverage());
+ }
+ }
+
+}
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/grpc/GrpcServer.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/grpc/GrpcServer.java
index 7a5f163..1ef2642 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/grpc/GrpcServer.java
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/grpc/GrpcServer.java
@@ -18,12 +18,14 @@ import com.google.common.annotations.VisibleForTesting;
import io.grpc.HandlerRegistry;
import io.grpc.Server;
import io.grpc.ServerBuilder;
+import io.grpc.ServerInterceptors;
import io.grpc.ServerServiceDefinition;
import io.grpc.inprocess.InProcessServerBuilder;
import java.io.IOException;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.component.AbstractLifecycleComponent;
import org.apache.bookkeeper.common.grpc.proxy.ProxyHandlerRegistry;
+import org.apache.bookkeeper.common.grpc.stats.MonitoringServerInterceptor;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.stream.proto.common.Endpoint;
import org.apache.bookkeeper.stream.server.conf.StorageServerConfiguration;
@@ -75,13 +77,23 @@ public class GrpcServer extends AbstractLifecycleComponent<StorageServerConfigur
}
this.grpcServer = serverBuilder.build();
} else {
+ MonitoringServerInterceptor monitoringInterceptor =
+ MonitoringServerInterceptor.create(statsLogger.scope("services"), true);
ProxyHandlerRegistry.Builder proxyRegistryBuilder = ProxyHandlerRegistry.newBuilder()
.setChannelFinder(storageContainerStore);
for (ServerServiceDefinition definition : GrpcServices.create(null)) {
- proxyRegistryBuilder = proxyRegistryBuilder.addService(definition);
+ ServerServiceDefinition monitoredService = ServerInterceptors.intercept(
+ definition,
+ monitoringInterceptor
+ );
+ proxyRegistryBuilder = proxyRegistryBuilder.addService(monitoredService);
}
+ ServerServiceDefinition locationService = ServerInterceptors.intercept(
+ new GrpcStorageContainerService(storageContainerStore),
+ monitoringInterceptor
+ );
this.grpcServer = ServerBuilder.forPort(this.myEndpoint.getPort())
- .addService(new GrpcStorageContainerService(storageContainerStore))
+ .addService(locationService)
.fallbackHandlerRegistry(proxyRegistryBuilder.build())
.build();
}