You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2019/03/11 13:00:52 UTC
[incubator-skywalking] branch k8s-Coordinator-role updated: Do lock
when port has been intialized.
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch k8s-Coordinator-role
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git
The following commit(s) were added to refs/heads/k8s-Coordinator-role by this push:
new a84cb5b Do lock when port has been intialized.
a84cb5b is described below
commit a84cb5b187e0ef50558a1b529905861cec95270c
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Mon Mar 11 21:00:44 2019 +0800
Do lock when port has been intialized.
---
.../plugin/kubernetes/KubernetesCoordinator.java | 31 +++++++++++++++++-----
1 file changed, 25 insertions(+), 6 deletions(-)
diff --git a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java
index 14e0f91..c91bd9a 100644
--- a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java
+++ b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java
@@ -22,6 +22,7 @@ import com.google.common.collect.Lists;
import com.google.common.util.concurrent.*;
import java.util.*;
import java.util.concurrent.*;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.skywalking.oap.server.core.CoreModule;
@@ -50,6 +51,8 @@ public class KubernetesCoordinator implements ClusterRegister, ClusterNodesQuery
private final ReusableWatch<Event> watch;
+ private final ReentrantLock portSetLock;
+
private volatile int port = -1;
KubernetesCoordinator(ModuleManager manager,
@@ -58,6 +61,7 @@ public class KubernetesCoordinator implements ClusterRegister, ClusterNodesQuery
this.watch = watch;
this.uid = uidSupplier.get();
TelemetryRelatedContext.INSTANCE.setId(uid);
+ this.portSetLock = new ReentrantLock();
}
public void start() {
@@ -100,7 +104,16 @@ public class KubernetesCoordinator implements ClusterRegister, ClusterNodesQuery
switch (event.getType()) {
case "ADDED":
case "MODIFIED":
- cache.put(event.getUid(), new RemoteInstance(new Address(event.getHost(), port, event.getUid().equals(this.uid))));
+ if (port == -1) {
+ portSetLock.lock();
+ try {
+ cache.put(event.getUid(), new RemoteInstance(new Address(event.getHost(), port, event.getUid().equals(this.uid))));
+ } finally {
+ portSetLock.unlock();
+ }
+ } else {
+ cache.put(event.getUid(), new RemoteInstance(new Address(event.getHost(), port, event.getUid().equals(this.uid))));
+ }
break;
case "DELETED":
cache.remove(event.getUid());
@@ -113,11 +126,17 @@ public class KubernetesCoordinator implements ClusterRegister, ClusterNodesQuery
@Override public List<RemoteInstance> queryRemoteNodes() {
if (port == -1) {
- logger.debug("Query kubernetes remote, port hasn't init, try to init");
- gRPCConfigService service = manager.find(CoreModule.NAME).provider().getService(gRPCConfigService.class);
- port = service.getPort();
- logger.debug("Query kubernetes remote, port is set at {}", port);
- cache.values().forEach(instance -> instance.getAddress().setPort(port));
+ // Use lock mechanism to avoid concurrency conflict with `generateRemoteNodes`.
+ portSetLock.lock();
+ try {
+ logger.debug("Query kubernetes remote, port hasn't init, try to init");
+ gRPCConfigService service = manager.find(CoreModule.NAME).provider().getService(gRPCConfigService.class);
+ port = service.getPort();
+ logger.debug("Query kubernetes remote, port is set at {}", port);
+ cache.values().forEach(instance -> instance.getAddress().setPort(port));
+ } finally {
+ portSetLock.unlock();
+ }
}
logger.debug("Query kubernetes remote nodes: {}", cache);
return Lists.newArrayList(cache.values());