You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2020/12/12 01:48:24 UTC
[skywalking] 01/01: Fix potential gRPC connection leak(not closed)
for the channels among OAP instances.
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch refresh-npe
in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit c707e4a4cabb11bb022ea7f4dedff31ec61cc8ac
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Sat Dec 12 09:47:58 2020 +0800
Fix potential gRPC connection leak(not closed) for the channels among OAP instances.
---
CHANGES.md | 1 +
.../core/remote/client/RemoteClientManager.java | 37 ++++++++++++++++------
2 files changed, 29 insertions(+), 9 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 1ed21e7..70877f4 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -20,6 +20,7 @@ Release Notes.
* Add the rule name field to alarm record storage entity as a part of ID, to support multiple alarm rules triggered for one entity. The scope id has been removed from the ID.
* Fix MAL concurrent execution issues.
* Fix group name can't be query in the GraphQL.
+* Fix potential gRPC connection leak(not closed) for the channels among OAP instances.
#### UI
* Fix un-removed tags in trace query.
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
index 442b1e5..c7ee112 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
@@ -65,9 +65,10 @@ public class RemoteClientManager implements Service {
/**
* Initial the manager for all remote communication clients.
+ *
* @param moduleDefineHolder for looking up other modules
* @param remoteTimeout for cluster internal communication, in second unit.
- * @param trustedCAFile SslContext to verify server certificates.
+ * @param trustedCAFile SslContext to verify server certificates.
*/
public RemoteClientManager(ModuleDefineHolder moduleDefineHolder,
int remoteTimeout,
@@ -80,7 +81,8 @@ public class RemoteClientManager implements Service {
* Initial the manager for all remote communication clients.
*
* Initial the manager for all remote communication clients.
- * @param moduleDefineHolder for looking up other modules
+ *
+ * @param moduleDefineHolder for looking up other modules
* @param remoteTimeout for cluster internal communication, in second unit.
*/
public RemoteClientManager(final ModuleDefineHolder moduleDefineHolder, final int remoteTimeout) {
@@ -103,7 +105,10 @@ public class RemoteClientManager implements Service {
gauge = moduleDefineHolder.find(TelemetryModule.NAME)
.provider()
.getService(MetricsCreator.class)
- .createGauge("cluster_size", "Cluster size of current oap node", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
+ .createGauge(
+ "cluster_size", "Cluster size of current oap node", MetricsTag.EMPTY_KEY,
+ MetricsTag.EMPTY_VALUE
+ );
}
try {
if (Objects.isNull(clusterNodesQuery)) {
@@ -187,13 +192,24 @@ public class RemoteClientManager implements Service {
* @param remoteInstances Remote instance collection by query cluster config.
*/
private void reBuildRemoteClients(List<RemoteInstance> remoteInstances) {
- final Map<Address, RemoteClientAction> remoteClientCollection = this.usingClients.stream()
- .collect(Collectors.toMap(RemoteClient::getAddress, client -> new RemoteClientAction(client, Action.Close)));
+ final Map<Address, RemoteClientAction> remoteClientCollection =
+ this.usingClients.stream()
+ .collect(Collectors.toMap(
+ RemoteClient::getAddress,
+ client -> new RemoteClientAction(
+ client, Action.Close)
+ ));
- final Map<Address, RemoteClientAction> latestRemoteClients = remoteInstances.stream()
- .collect(Collectors.toMap(RemoteInstance::getAddress, remote -> new RemoteClientAction(null, Action.Create)));
+ final Map<Address, RemoteClientAction> latestRemoteClients =
+ remoteInstances.stream()
+ .collect(Collectors.toMap(
+ RemoteInstance::getAddress,
+ remote -> new RemoteClientAction(
+ null, Action.Create)
+ ));
- final Set<Address> unChangeAddresses = Sets.intersection(remoteClientCollection.keySet(), latestRemoteClients.keySet());
+ final Set<Address> unChangeAddresses = Sets.intersection(
+ remoteClientCollection.keySet(), latestRemoteClients.keySet());
unChangeAddresses.stream()
.filter(remoteClientCollection::containsKey)
@@ -230,7 +246,10 @@ public class RemoteClientManager implements Service {
remoteClientCollection.values()
.stream()
- .filter(remoteClientAction -> remoteClientAction.getAction().equals(Action.Close))
+ .filter(remoteClientAction ->
+ remoteClientAction.getAction().equals(Action.Close)
+ && !remoteClientAction.getRemoteClient().getAddress().isSelf()
+ )
.forEach(remoteClientAction -> remoteClientAction.getRemoteClient().close());
}