You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2021/06/29 00:45:58 UTC

[ratis] branch master updated: RATIS-1386. Some MetricsRegistry instances never unregistered (#486)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b57296  RATIS-1386. Some MetricsRegistry instances never unregistered (#486)
9b57296 is described below

commit 9b57296a38ecb87cb74eb7895e6e9f1e201d70c1
Author: Doroszlai, Attila <64...@users.noreply.github.com>
AuthorDate: Tue Jun 29 02:45:52 2021 +0200

    RATIS-1386. Some MetricsRegistry instances never unregistered (#486)
---
 .../apache/ratis/grpc/client/GrpcClientProtocolClient.java  | 13 +++++++------
 .../metrics/intercept/client/MetricClientInterceptor.java   |  9 ++++++++-
 .../metrics/intercept/server/MetricServerInterceptor.java   | 11 ++++++++++-
 .../main/java/org/apache/ratis/grpc/server/GrpcService.java |  2 ++
 .../apache/ratis/server/raftlog/memory/MemoryRaftLog.java   |  7 +++++++
 5 files changed, 34 insertions(+), 8 deletions(-)

diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
index dc8def1..48084c9 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
@@ -92,6 +92,7 @@ public class GrpcClientProtocolClient implements Closeable {
   private final AtomicReference<AsyncStreamObservers> orderedStreamObservers = new AtomicReference<>();
 
   private final AtomicReference<AsyncStreamObservers> unorderedStreamObservers = new AtomicReference<>();
+  private final MetricClientInterceptor metricClientInterceptor;
 
   GrpcClientProtocolClient(ClientId id, RaftPeer target, RaftProperties properties,
       GrpcTlsConfig adminTlsConfig, GrpcTlsConfig clientTlsConfig) {
@@ -99,7 +100,7 @@ public class GrpcClientProtocolClient implements Closeable {
     this.target = target;
     final SizeInBytes flowControlWindow = GrpcConfigKeys.flowControlWindow(properties, LOG::debug);
     final SizeInBytes maxMessageSize = GrpcConfigKeys.messageSizeMax(properties, LOG::debug);
-    final MetricClientInterceptor monitoringInterceptor = new MetricClientInterceptor(getName());
+    metricClientInterceptor = new MetricClientInterceptor(getName());
 
     final String clientAddress = Optional.ofNullable(target.getClientAddress())
         .filter(x -> !x.isEmpty()).orElse(target.getAddress());
@@ -108,10 +109,10 @@ public class GrpcClientProtocolClient implements Closeable {
     final boolean separateAdminChannel = !Objects.equals(clientAddress, adminAddress);
 
     clientChannel = buildChannel(clientAddress, clientTlsConfig,
-        flowControlWindow, maxMessageSize, monitoringInterceptor);
+        flowControlWindow, maxMessageSize);
     adminChannel = separateAdminChannel
         ? buildChannel(adminAddress, adminTlsConfig,
-            flowControlWindow, maxMessageSize, monitoringInterceptor)
+            flowControlWindow, maxMessageSize)
         : clientChannel;
 
     asyncStub = RaftClientProtocolServiceGrpc.newStub(clientChannel);
@@ -122,8 +123,7 @@ public class GrpcClientProtocolClient implements Closeable {
   }
 
   private ManagedChannel buildChannel(String address, GrpcTlsConfig tlsConf,
-      SizeInBytes flowControlWindow, SizeInBytes maxMessageSize,
-      MetricClientInterceptor monitoringInterceptor) {
+      SizeInBytes flowControlWindow, SizeInBytes maxMessageSize) {
     NettyChannelBuilder channelBuilder =
         NettyChannelBuilder.forTarget(address);
 
@@ -155,7 +155,7 @@ public class GrpcClientProtocolClient implements Closeable {
 
     return channelBuilder.flowControlWindow(flowControlWindow.getSizeInt())
         .maxInboundMessageSize(maxMessageSize.getSizeInt())
-        .intercept(monitoringInterceptor)
+        .intercept(metricClientInterceptor)
         .build();
   }
 
@@ -172,6 +172,7 @@ public class GrpcClientProtocolClient implements Closeable {
       GrpcUtil.shutdownManagedChannel(adminChannel);
     }
     scheduler.close();
+    metricClientInterceptor.close();
   }
 
   RaftClientReplyProto groupAdd(GroupManagementRequestProto request) throws IOException {
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/intercept/client/MetricClientInterceptor.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/intercept/client/MetricClientInterceptor.java
index 7597687..85ccba1 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/intercept/client/MetricClientInterceptor.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/intercept/client/MetricClientInterceptor.java
@@ -21,13 +21,15 @@ package org.apache.ratis.grpc.metrics.intercept.client;
 import org.apache.ratis.grpc.metrics.MessageMetrics;
 import org.apache.ratis.thirdparty.io.grpc.*;
 
+import java.io.Closeable;
+
 /**
  * An implementation of a client interceptor.
  * Intercepts the messages and increments metrics accordingly
  * before sending them.
  */
 
-public class MetricClientInterceptor implements ClientInterceptor {
+public class MetricClientInterceptor implements ClientInterceptor, Closeable {
   private final String identifier;
   private final MessageMetrics metrics;
 
@@ -53,4 +55,9 @@ public class MetricClientInterceptor implements ClientInterceptor {
         getMethodMetricPrefix(methodDescriptor)
     );
   }
+
+  @Override
+  public void close() {
+    metrics.unregister();
+  }
 }
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/intercept/server/MetricServerInterceptor.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/intercept/server/MetricServerInterceptor.java
index 6ec5468..a11d5ca 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/intercept/server/MetricServerInterceptor.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/intercept/server/MetricServerInterceptor.java
@@ -26,6 +26,7 @@ import org.apache.ratis.thirdparty.io.grpc.ServerCallHandler;
 import org.apache.ratis.grpc.metrics.MessageMetrics;
 import org.apache.ratis.thirdparty.io.grpc.ServerInterceptor;
 
+import java.io.Closeable;
 import java.util.function.Supplier;
 
 /**
@@ -34,7 +35,7 @@ import java.util.function.Supplier;
  * before handling them.
  */
 
-public class MetricServerInterceptor implements ServerInterceptor {
+public class MetricServerInterceptor implements ServerInterceptor, Closeable {
   private String identifier;
   private MessageMetrics metrics;
   private final Supplier<RaftPeerId> peerIdSupplier;
@@ -77,4 +78,12 @@ public class MetricServerInterceptor implements ServerInterceptor {
     return new MetricServerCallListener<>(
         next.startCall(monitoringCall, requestHeaders), metricNamePrefix, metrics);
   }
+
+  @Override
+  public void close() {
+    final MessageMetrics m = metrics;
+    if (m != null) {
+      m.unregister();
+    }
+  }
 }
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
index a618678..0fbdbfc 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
@@ -270,6 +270,8 @@ public final class GrpcService extends RaftServerRpcWithProxy<GrpcServerProtocol
       }
       LOG.info("{} successfully", name);
     }
+
+    serverInterceptor.close();
   }
 
   @Override
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
index 6f80cdd..41caef4 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
@@ -28,6 +28,7 @@ import org.apache.ratis.server.storage.RaftStorageMetadata;
 import org.apache.ratis.util.AutoCloseableLock;
 import org.apache.ratis.util.Preconditions;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -87,6 +88,12 @@ public class MemoryRaftLog extends RaftLogBase {
   }
 
   @Override
+  public void close() throws IOException {
+    super.close();
+    metrics.unregister();
+  }
+
+  @Override
   public RaftLogMetricsBase getRaftLogMetrics() {
     return metrics;
   }