You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2020/12/12 01:48:24 UTC

[skywalking] 01/01: Fix potential gRPC connection leak(not closed) for the channels among OAP instances.

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch refresh-npe
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit c707e4a4cabb11bb022ea7f4dedff31ec61cc8ac
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Sat Dec 12 09:47:58 2020 +0800

    Fix potential gRPC connection leak(not closed) for the channels among OAP instances.
---
 CHANGES.md                                         |  1 +
 .../core/remote/client/RemoteClientManager.java    | 37 ++++++++++++++++------
 2 files changed, 29 insertions(+), 9 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 1ed21e7..70877f4 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -20,6 +20,7 @@ Release Notes.
 * Add the rule name field to alarm record storage entity as a part of ID, to support multiple alarm rules triggered for one entity. The scope id has been removed from the ID.
 * Fix MAL concurrent execution issues.
 * Fix group name can't be query in the GraphQL.
+* Fix potential gRPC connection leak(not closed) for the channels among OAP instances. 
 
 #### UI
 * Fix un-removed tags in trace query.
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
index 442b1e5..c7ee112 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
@@ -65,9 +65,10 @@ public class RemoteClientManager implements Service {
 
     /**
      * Initial the manager for all remote communication clients.
+     *
      * @param moduleDefineHolder for looking up other modules
      * @param remoteTimeout      for cluster internal communication, in second unit.
-     * @param trustedCAFile         SslContext to verify server certificates.
+     * @param trustedCAFile      SslContext to verify server certificates.
      */
     public RemoteClientManager(ModuleDefineHolder moduleDefineHolder,
                                int remoteTimeout,
@@ -80,7 +81,8 @@ public class RemoteClientManager implements Service {
      * Initial the manager for all remote communication clients.
      *
      * Initial the manager for all remote communication clients.
-     *  @param moduleDefineHolder for looking up other modules
+     *
+     * @param moduleDefineHolder for looking up other modules
      * @param remoteTimeout      for cluster internal communication, in second unit.
      */
     public RemoteClientManager(final ModuleDefineHolder moduleDefineHolder, final int remoteTimeout) {
@@ -103,7 +105,10 @@ public class RemoteClientManager implements Service {
             gauge = moduleDefineHolder.find(TelemetryModule.NAME)
                                       .provider()
                                       .getService(MetricsCreator.class)
-                                      .createGauge("cluster_size", "Cluster size of current oap node", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
+                                      .createGauge(
+                                          "cluster_size", "Cluster size of current oap node", MetricsTag.EMPTY_KEY,
+                                          MetricsTag.EMPTY_VALUE
+                                      );
         }
         try {
             if (Objects.isNull(clusterNodesQuery)) {
@@ -187,13 +192,24 @@ public class RemoteClientManager implements Service {
      * @param remoteInstances Remote instance collection by query cluster config.
      */
     private void reBuildRemoteClients(List<RemoteInstance> remoteInstances) {
-        final Map<Address, RemoteClientAction> remoteClientCollection = this.usingClients.stream()
-                                                                                         .collect(Collectors.toMap(RemoteClient::getAddress, client -> new RemoteClientAction(client, Action.Close)));
+        final Map<Address, RemoteClientAction> remoteClientCollection =
+            this.usingClients.stream()
+                             .collect(Collectors.toMap(
+                                 RemoteClient::getAddress,
+                                 client -> new RemoteClientAction(
+                                     client, Action.Close)
+                             ));
 
-        final Map<Address, RemoteClientAction> latestRemoteClients = remoteInstances.stream()
-                                                                                    .collect(Collectors.toMap(RemoteInstance::getAddress, remote -> new RemoteClientAction(null, Action.Create)));
+        final Map<Address, RemoteClientAction> latestRemoteClients =
+            remoteInstances.stream()
+                           .collect(Collectors.toMap(
+                               RemoteInstance::getAddress,
+                               remote -> new RemoteClientAction(
+                                   null, Action.Create)
+                           ));
 
-        final Set<Address> unChangeAddresses = Sets.intersection(remoteClientCollection.keySet(), latestRemoteClients.keySet());
+        final Set<Address> unChangeAddresses = Sets.intersection(
+            remoteClientCollection.keySet(), latestRemoteClients.keySet());
 
         unChangeAddresses.stream()
                          .filter(remoteClientCollection::containsKey)
@@ -230,7 +246,10 @@ public class RemoteClientManager implements Service {
 
         remoteClientCollection.values()
                               .stream()
-                              .filter(remoteClientAction -> remoteClientAction.getAction().equals(Action.Close))
+                              .filter(remoteClientAction ->
+                                          remoteClientAction.getAction().equals(Action.Close)
+                                              && !remoteClientAction.getRemoteClient().getAddress().isSelf()
+                              )
                               .forEach(remoteClientAction -> remoteClientAction.getRemoteClient().close());
     }