You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2020/12/12 04:35:22 UTC

[skywalking] branch master updated: Fix potential gRPC connection leak(not closed) for the channels among OAP instances. (#5995)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new f513726  Fix potential gRPC connection leak(not closed) for the channels among OAP instances.  (#5995)
f513726 is described below

commit f513726fead7543e2b90acea079e6c7b50523c01
Author: 吴晟 Wu Sheng <wu...@foxmail.com>
AuthorDate: Sat Dec 12 12:35:03 2020 +0800

    Fix potential gRPC connection leak(not closed) for the channels among OAP instances.  (#5995)
    
    * Fix potential gRPC connection leak(not closed) for the channels among OAP instances.
    
    * Filter OAP instances(unassigned in booting stage) of the empty IP in KubernetesCoordinator.
---
 CHANGES.md                                         |  2 ++
 .../plugin/kubernetes/KubernetesCoordinator.java   | 24 ++++++++------
 .../core/remote/client/RemoteClientManager.java    | 37 ++++++++++++++++------
 3 files changed, 44 insertions(+), 19 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 1ed21e7..d893720 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -20,6 +20,8 @@ Release Notes.
 * Add the rule name field to alarm record storage entity as a part of ID, to support multiple alarm rules triggered for one entity. The scope id has been removed from the ID.
 * Fix MAL concurrent execution issues.
 * Fix group name can't be query in the GraphQL.
+* Fix potential gRPC connection leak(not closed) for the channels among OAP instances.
+* Filter OAP instances(unassigned in booting stage) of the empty IP in KubernetesCoordinator.
 
 #### UI
 * Fix un-removed tags in trace query.
diff --git a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java
index 3ca7736..9e56f3c 100644
--- a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java
+++ b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java
@@ -21,12 +21,11 @@ package org.apache.skywalking.oap.server.cluster.plugin.kubernetes;
 import io.kubernetes.client.openapi.models.V1ObjectMeta;
 import io.kubernetes.client.openapi.models.V1Pod;
 import io.kubernetes.client.openapi.models.V1PodStatus;
-
 import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
-
 import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.apm.util.StringUtil;
 import org.apache.skywalking.oap.server.core.CoreModule;
 import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
 import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
@@ -66,23 +65,25 @@ public class KubernetesCoordinator implements ClusterRegister, ClusterNodesQuery
             List<V1Pod> pods = NamespacedPodListInformer.INFORMER.listPods().orElseGet(this::selfPod);
             if (log.isDebugEnabled()) {
                 List<String> uidList = pods
-                        .stream()
-                        .map(item -> item.getMetadata().getUid())
-                        .collect(Collectors.toList());
+                    .stream()
+                    .map(item -> item.getMetadata().getUid())
+                    .collect(Collectors.toList());
                 log.debug("[kubernetes cluster pods uid list]:{}", uidList.toString());
             }
             if (port == -1) {
                 port = manager.find(CoreModule.NAME).provider().getService(ConfigService.class).getGRPCPort();
             }
-            List<RemoteInstance> remoteInstances =  pods.stream()
+            List<RemoteInstance> remoteInstances =
+                pods.stream()
+                    .filter(pod -> StringUtil.isNotBlank(pod.getStatus().getPodIP()))
                     .map(pod -> new RemoteInstance(
-                            new Address(pod.getStatus().getPodIP(), port, pod.getMetadata().getUid().equals(uid))))
+                        new Address(pod.getStatus().getPodIP(), port, pod.getMetadata().getUid().equals(uid))))
                     .collect(Collectors.toList());
             healthChecker.health();
             return remoteInstances;
         } catch (Throwable e) {
             healthChecker.unHealth(e);
-            throw  new ServiceQueryException(e.getMessage());
+            throw new ServiceQueryException(e.getMessage());
         }
     }
 
@@ -100,8 +101,11 @@ public class KubernetesCoordinator implements ClusterRegister, ClusterNodesQuery
 
     private void initHealthChecker() {
         if (healthChecker == null) {
-            MetricsCreator metricCreator = manager.find(TelemetryModule.NAME).provider().getService(MetricsCreator.class);
-            healthChecker = metricCreator.createHealthCheckerGauge("cluster_k8s", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
+            MetricsCreator metricCreator = manager.find(TelemetryModule.NAME)
+                                                  .provider()
+                                                  .getService(MetricsCreator.class);
+            healthChecker = metricCreator.createHealthCheckerGauge(
+                "cluster_k8s", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
         }
     }
 
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
index 442b1e5..c7ee112 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
@@ -65,9 +65,10 @@ public class RemoteClientManager implements Service {
 
     /**
      * Initial the manager for all remote communication clients.
+     *
      * @param moduleDefineHolder for looking up other modules
      * @param remoteTimeout      for cluster internal communication, in second unit.
-     * @param trustedCAFile         SslContext to verify server certificates.
+     * @param trustedCAFile      SslContext to verify server certificates.
      */
     public RemoteClientManager(ModuleDefineHolder moduleDefineHolder,
                                int remoteTimeout,
@@ -80,7 +81,8 @@ public class RemoteClientManager implements Service {
      * Initial the manager for all remote communication clients.
      *
      * Initial the manager for all remote communication clients.
-     *  @param moduleDefineHolder for looking up other modules
+     *
+     * @param moduleDefineHolder for looking up other modules
      * @param remoteTimeout      for cluster internal communication, in second unit.
      */
     public RemoteClientManager(final ModuleDefineHolder moduleDefineHolder, final int remoteTimeout) {
@@ -103,7 +105,10 @@ public class RemoteClientManager implements Service {
             gauge = moduleDefineHolder.find(TelemetryModule.NAME)
                                       .provider()
                                       .getService(MetricsCreator.class)
-                                      .createGauge("cluster_size", "Cluster size of current oap node", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
+                                      .createGauge(
+                                          "cluster_size", "Cluster size of current oap node", MetricsTag.EMPTY_KEY,
+                                          MetricsTag.EMPTY_VALUE
+                                      );
         }
         try {
             if (Objects.isNull(clusterNodesQuery)) {
@@ -187,13 +192,24 @@ public class RemoteClientManager implements Service {
      * @param remoteInstances Remote instance collection by query cluster config.
      */
     private void reBuildRemoteClients(List<RemoteInstance> remoteInstances) {
-        final Map<Address, RemoteClientAction> remoteClientCollection = this.usingClients.stream()
-                                                                                         .collect(Collectors.toMap(RemoteClient::getAddress, client -> new RemoteClientAction(client, Action.Close)));
+        final Map<Address, RemoteClientAction> remoteClientCollection =
+            this.usingClients.stream()
+                             .collect(Collectors.toMap(
+                                 RemoteClient::getAddress,
+                                 client -> new RemoteClientAction(
+                                     client, Action.Close)
+                             ));
 
-        final Map<Address, RemoteClientAction> latestRemoteClients = remoteInstances.stream()
-                                                                                    .collect(Collectors.toMap(RemoteInstance::getAddress, remote -> new RemoteClientAction(null, Action.Create)));
+        final Map<Address, RemoteClientAction> latestRemoteClients =
+            remoteInstances.stream()
+                           .collect(Collectors.toMap(
+                               RemoteInstance::getAddress,
+                               remote -> new RemoteClientAction(
+                                   null, Action.Create)
+                           ));
 
-        final Set<Address> unChangeAddresses = Sets.intersection(remoteClientCollection.keySet(), latestRemoteClients.keySet());
+        final Set<Address> unChangeAddresses = Sets.intersection(
+            remoteClientCollection.keySet(), latestRemoteClients.keySet());
 
         unChangeAddresses.stream()
                          .filter(remoteClientCollection::containsKey)
@@ -230,7 +246,10 @@ public class RemoteClientManager implements Service {
 
         remoteClientCollection.values()
                               .stream()
-                              .filter(remoteClientAction -> remoteClientAction.getAction().equals(Action.Close))
+                              .filter(remoteClientAction ->
+                                          remoteClientAction.getAction().equals(Action.Close)
+                                              && !remoteClientAction.getRemoteClient().getAddress().isSelf()
+                              )
                               .forEach(remoteClientAction -> remoteClientAction.getRemoteClient().close());
     }