You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2021/06/29 00:45:58 UTC
[ratis] branch master updated: RATIS-1386. Some MetricsRegistry
instances never unregistered (#486)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 9b57296 RATIS-1386. Some MetricsRegistry instances never unregistered (#486)
9b57296 is described below
commit 9b57296a38ecb87cb74eb7895e6e9f1e201d70c1
Author: Doroszlai, Attila <64...@users.noreply.github.com>
AuthorDate: Tue Jun 29 02:45:52 2021 +0200
RATIS-1386. Some MetricsRegistry instances never unregistered (#486)
---
.../apache/ratis/grpc/client/GrpcClientProtocolClient.java | 13 +++++++------
.../metrics/intercept/client/MetricClientInterceptor.java | 9 ++++++++-
.../metrics/intercept/server/MetricServerInterceptor.java | 11 ++++++++++-
.../main/java/org/apache/ratis/grpc/server/GrpcService.java | 2 ++
.../apache/ratis/server/raftlog/memory/MemoryRaftLog.java | 7 +++++++
5 files changed, 34 insertions(+), 8 deletions(-)
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
index dc8def1..48084c9 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
@@ -92,6 +92,7 @@ public class GrpcClientProtocolClient implements Closeable {
private final AtomicReference<AsyncStreamObservers> orderedStreamObservers = new AtomicReference<>();
private final AtomicReference<AsyncStreamObservers> unorderedStreamObservers = new AtomicReference<>();
+ private final MetricClientInterceptor metricClientInterceptor;
GrpcClientProtocolClient(ClientId id, RaftPeer target, RaftProperties properties,
GrpcTlsConfig adminTlsConfig, GrpcTlsConfig clientTlsConfig) {
@@ -99,7 +100,7 @@ public class GrpcClientProtocolClient implements Closeable {
this.target = target;
final SizeInBytes flowControlWindow = GrpcConfigKeys.flowControlWindow(properties, LOG::debug);
final SizeInBytes maxMessageSize = GrpcConfigKeys.messageSizeMax(properties, LOG::debug);
- final MetricClientInterceptor monitoringInterceptor = new MetricClientInterceptor(getName());
+ metricClientInterceptor = new MetricClientInterceptor(getName());
final String clientAddress = Optional.ofNullable(target.getClientAddress())
.filter(x -> !x.isEmpty()).orElse(target.getAddress());
@@ -108,10 +109,10 @@ public class GrpcClientProtocolClient implements Closeable {
final boolean separateAdminChannel = !Objects.equals(clientAddress, adminAddress);
clientChannel = buildChannel(clientAddress, clientTlsConfig,
- flowControlWindow, maxMessageSize, monitoringInterceptor);
+ flowControlWindow, maxMessageSize);
adminChannel = separateAdminChannel
? buildChannel(adminAddress, adminTlsConfig,
- flowControlWindow, maxMessageSize, monitoringInterceptor)
+ flowControlWindow, maxMessageSize)
: clientChannel;
asyncStub = RaftClientProtocolServiceGrpc.newStub(clientChannel);
@@ -122,8 +123,7 @@ public class GrpcClientProtocolClient implements Closeable {
}
private ManagedChannel buildChannel(String address, GrpcTlsConfig tlsConf,
- SizeInBytes flowControlWindow, SizeInBytes maxMessageSize,
- MetricClientInterceptor monitoringInterceptor) {
+ SizeInBytes flowControlWindow, SizeInBytes maxMessageSize) {
NettyChannelBuilder channelBuilder =
NettyChannelBuilder.forTarget(address);
@@ -155,7 +155,7 @@ public class GrpcClientProtocolClient implements Closeable {
return channelBuilder.flowControlWindow(flowControlWindow.getSizeInt())
.maxInboundMessageSize(maxMessageSize.getSizeInt())
- .intercept(monitoringInterceptor)
+ .intercept(metricClientInterceptor)
.build();
}
@@ -172,6 +172,7 @@ public class GrpcClientProtocolClient implements Closeable {
GrpcUtil.shutdownManagedChannel(adminChannel);
}
scheduler.close();
+ metricClientInterceptor.close();
}
RaftClientReplyProto groupAdd(GroupManagementRequestProto request) throws IOException {
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/intercept/client/MetricClientInterceptor.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/intercept/client/MetricClientInterceptor.java
index 7597687..85ccba1 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/intercept/client/MetricClientInterceptor.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/intercept/client/MetricClientInterceptor.java
@@ -21,13 +21,15 @@ package org.apache.ratis.grpc.metrics.intercept.client;
import org.apache.ratis.grpc.metrics.MessageMetrics;
import org.apache.ratis.thirdparty.io.grpc.*;
+import java.io.Closeable;
+
/**
* An implementation of a client interceptor.
* Intercepts the messages and increments metrics accordingly
* before sending them.
*/
-public class MetricClientInterceptor implements ClientInterceptor {
+public class MetricClientInterceptor implements ClientInterceptor, Closeable {
private final String identifier;
private final MessageMetrics metrics;
@@ -53,4 +55,9 @@ public class MetricClientInterceptor implements ClientInterceptor {
getMethodMetricPrefix(methodDescriptor)
);
}
+
+ @Override
+ public void close() {
+ metrics.unregister();
+ }
}
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/intercept/server/MetricServerInterceptor.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/intercept/server/MetricServerInterceptor.java
index 6ec5468..a11d5ca 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/intercept/server/MetricServerInterceptor.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/intercept/server/MetricServerInterceptor.java
@@ -26,6 +26,7 @@ import org.apache.ratis.thirdparty.io.grpc.ServerCallHandler;
import org.apache.ratis.grpc.metrics.MessageMetrics;
import org.apache.ratis.thirdparty.io.grpc.ServerInterceptor;
+import java.io.Closeable;
import java.util.function.Supplier;
/**
@@ -34,7 +35,7 @@ import java.util.function.Supplier;
* before handling them.
*/
-public class MetricServerInterceptor implements ServerInterceptor {
+public class MetricServerInterceptor implements ServerInterceptor, Closeable {
private String identifier;
private MessageMetrics metrics;
private final Supplier<RaftPeerId> peerIdSupplier;
@@ -77,4 +78,12 @@ public class MetricServerInterceptor implements ServerInterceptor {
return new MetricServerCallListener<>(
next.startCall(monitoringCall, requestHeaders), metricNamePrefix, metrics);
}
+
+ @Override
+ public void close() {
+ final MessageMetrics m = metrics;
+ if (m != null) {
+ m.unregister();
+ }
+ }
}
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
index a618678..0fbdbfc 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
@@ -270,6 +270,8 @@ public final class GrpcService extends RaftServerRpcWithProxy<GrpcServerProtocol
}
LOG.info("{} successfully", name);
}
+
+ serverInterceptor.close();
}
@Override
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
index 6f80cdd..41caef4 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
@@ -28,6 +28,7 @@ import org.apache.ratis.server.storage.RaftStorageMetadata;
import org.apache.ratis.util.AutoCloseableLock;
import org.apache.ratis.util.Preconditions;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -87,6 +88,12 @@ public class MemoryRaftLog extends RaftLogBase {
}
@Override
+ public void close() throws IOException {
+ super.close();
+ metrics.unregister();
+ }
+
+ @Override
public RaftLogMetricsBase getRaftLogMetrics() {
return metrics;
}