You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2020/12/12 04:35:22 UTC
[skywalking] branch master updated: Fix potential gRPC connection
leak(not closed) for the channels among OAP instances. (#5995)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new f513726 Fix potential gRPC connection leak(not closed) for the channels among OAP instances. (#5995)
f513726 is described below
commit f513726fead7543e2b90acea079e6c7b50523c01
Author: 吴晟 Wu Sheng <wu...@foxmail.com>
AuthorDate: Sat Dec 12 12:35:03 2020 +0800
Fix potential gRPC connection leak(not closed) for the channels among OAP instances. (#5995)
* Fix potential gRPC connection leak(not closed) for the channels among OAP instances.
* Filter OAP instances(unassigned in booting stage) of the empty IP in KubernetesCoordinator.
---
CHANGES.md | 2 ++
.../plugin/kubernetes/KubernetesCoordinator.java | 24 ++++++++------
.../core/remote/client/RemoteClientManager.java | 37 ++++++++++++++++------
3 files changed, 44 insertions(+), 19 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 1ed21e7..d893720 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -20,6 +20,8 @@ Release Notes.
* Add the rule name field to alarm record storage entity as a part of ID, to support multiple alarm rules triggered for one entity. The scope id has been removed from the ID.
* Fix MAL concurrent execution issues.
* Fix group name can't be query in the GraphQL.
+* Fix potential gRPC connection leak(not closed) for the channels among OAP instances.
+* Filter OAP instances(unassigned in booting stage) of the empty IP in KubernetesCoordinator.
#### UI
* Fix un-removed tags in trace query.
diff --git a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java
index 3ca7736..9e56f3c 100644
--- a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java
+++ b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java
@@ -21,12 +21,11 @@ package org.apache.skywalking.oap.server.cluster.plugin.kubernetes;
import io.kubernetes.client.openapi.models.V1ObjectMeta;
import io.kubernetes.client.openapi.models.V1Pod;
import io.kubernetes.client.openapi.models.V1PodStatus;
-
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
-
import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
@@ -66,23 +65,25 @@ public class KubernetesCoordinator implements ClusterRegister, ClusterNodesQuery
List<V1Pod> pods = NamespacedPodListInformer.INFORMER.listPods().orElseGet(this::selfPod);
if (log.isDebugEnabled()) {
List<String> uidList = pods
- .stream()
- .map(item -> item.getMetadata().getUid())
- .collect(Collectors.toList());
+ .stream()
+ .map(item -> item.getMetadata().getUid())
+ .collect(Collectors.toList());
log.debug("[kubernetes cluster pods uid list]:{}", uidList.toString());
}
if (port == -1) {
port = manager.find(CoreModule.NAME).provider().getService(ConfigService.class).getGRPCPort();
}
- List<RemoteInstance> remoteInstances = pods.stream()
+ List<RemoteInstance> remoteInstances =
+ pods.stream()
+ .filter(pod -> StringUtil.isNotBlank(pod.getStatus().getPodIP()))
.map(pod -> new RemoteInstance(
- new Address(pod.getStatus().getPodIP(), port, pod.getMetadata().getUid().equals(uid))))
+ new Address(pod.getStatus().getPodIP(), port, pod.getMetadata().getUid().equals(uid))))
.collect(Collectors.toList());
healthChecker.health();
return remoteInstances;
} catch (Throwable e) {
healthChecker.unHealth(e);
- throw new ServiceQueryException(e.getMessage());
+ throw new ServiceQueryException(e.getMessage());
}
}
@@ -100,8 +101,11 @@ public class KubernetesCoordinator implements ClusterRegister, ClusterNodesQuery
private void initHealthChecker() {
if (healthChecker == null) {
- MetricsCreator metricCreator = manager.find(TelemetryModule.NAME).provider().getService(MetricsCreator.class);
- healthChecker = metricCreator.createHealthCheckerGauge("cluster_k8s", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
+ MetricsCreator metricCreator = manager.find(TelemetryModule.NAME)
+ .provider()
+ .getService(MetricsCreator.class);
+ healthChecker = metricCreator.createHealthCheckerGauge(
+ "cluster_k8s", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
index 442b1e5..c7ee112 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
@@ -65,9 +65,10 @@ public class RemoteClientManager implements Service {
/**
* Initial the manager for all remote communication clients.
+ *
* @param moduleDefineHolder for looking up other modules
* @param remoteTimeout for cluster internal communication, in second unit.
- * @param trustedCAFile SslContext to verify server certificates.
+ * @param trustedCAFile SslContext to verify server certificates.
*/
public RemoteClientManager(ModuleDefineHolder moduleDefineHolder,
int remoteTimeout,
@@ -80,7 +81,8 @@ public class RemoteClientManager implements Service {
* Initial the manager for all remote communication clients.
*
* Initial the manager for all remote communication clients.
- * @param moduleDefineHolder for looking up other modules
+ *
+ * @param moduleDefineHolder for looking up other modules
* @param remoteTimeout for cluster internal communication, in second unit.
*/
public RemoteClientManager(final ModuleDefineHolder moduleDefineHolder, final int remoteTimeout) {
@@ -103,7 +105,10 @@ public class RemoteClientManager implements Service {
gauge = moduleDefineHolder.find(TelemetryModule.NAME)
.provider()
.getService(MetricsCreator.class)
- .createGauge("cluster_size", "Cluster size of current oap node", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
+ .createGauge(
+ "cluster_size", "Cluster size of current oap node", MetricsTag.EMPTY_KEY,
+ MetricsTag.EMPTY_VALUE
+ );
}
try {
if (Objects.isNull(clusterNodesQuery)) {
@@ -187,13 +192,24 @@ public class RemoteClientManager implements Service {
* @param remoteInstances Remote instance collection by query cluster config.
*/
private void reBuildRemoteClients(List<RemoteInstance> remoteInstances) {
- final Map<Address, RemoteClientAction> remoteClientCollection = this.usingClients.stream()
- .collect(Collectors.toMap(RemoteClient::getAddress, client -> new RemoteClientAction(client, Action.Close)));
+ final Map<Address, RemoteClientAction> remoteClientCollection =
+ this.usingClients.stream()
+ .collect(Collectors.toMap(
+ RemoteClient::getAddress,
+ client -> new RemoteClientAction(
+ client, Action.Close)
+ ));
- final Map<Address, RemoteClientAction> latestRemoteClients = remoteInstances.stream()
- .collect(Collectors.toMap(RemoteInstance::getAddress, remote -> new RemoteClientAction(null, Action.Create)));
+ final Map<Address, RemoteClientAction> latestRemoteClients =
+ remoteInstances.stream()
+ .collect(Collectors.toMap(
+ RemoteInstance::getAddress,
+ remote -> new RemoteClientAction(
+ null, Action.Create)
+ ));
- final Set<Address> unChangeAddresses = Sets.intersection(remoteClientCollection.keySet(), latestRemoteClients.keySet());
+ final Set<Address> unChangeAddresses = Sets.intersection(
+ remoteClientCollection.keySet(), latestRemoteClients.keySet());
unChangeAddresses.stream()
.filter(remoteClientCollection::containsKey)
@@ -230,7 +246,10 @@ public class RemoteClientManager implements Service {
remoteClientCollection.values()
.stream()
- .filter(remoteClientAction -> remoteClientAction.getAction().equals(Action.Close))
+ .filter(remoteClientAction ->
+ remoteClientAction.getAction().equals(Action.Close)
+ && !remoteClientAction.getRemoteClient().getAddress().isSelf()
+ )
.forEach(remoteClientAction -> remoteClientAction.getRemoteClient().close());
}