You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2019/03/11 13:00:52 UTC

[incubator-skywalking] branch k8s-Coordinator-role updated: Do lock when port has been intialized.

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch k8s-Coordinator-role
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git


The following commit(s) were added to refs/heads/k8s-Coordinator-role by this push:
     new a84cb5b  Do lock when port has been intialized.
a84cb5b is described below

commit a84cb5b187e0ef50558a1b529905861cec95270c
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Mon Mar 11 21:00:44 2019 +0800

    Do lock when port has been intialized.
---
 .../plugin/kubernetes/KubernetesCoordinator.java   | 31 +++++++++++++++++-----
 1 file changed, 25 insertions(+), 6 deletions(-)

diff --git a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java
index 14e0f91..c91bd9a 100644
--- a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java
+++ b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java
@@ -22,6 +22,7 @@ import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.*;
 import java.util.*;
 import java.util.concurrent.*;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Supplier;
 import javax.annotation.Nullable;
 import org.apache.skywalking.oap.server.core.CoreModule;
@@ -50,6 +51,8 @@ public class KubernetesCoordinator implements ClusterRegister, ClusterNodesQuery
 
     private final ReusableWatch<Event> watch;
 
+    private final ReentrantLock portSetLock;
+
     private volatile int port = -1;
 
     KubernetesCoordinator(ModuleManager manager,
@@ -58,6 +61,7 @@ public class KubernetesCoordinator implements ClusterRegister, ClusterNodesQuery
         this.watch = watch;
         this.uid = uidSupplier.get();
         TelemetryRelatedContext.INSTANCE.setId(uid);
+        this.portSetLock = new ReentrantLock();
     }
 
     public void start() {
@@ -100,7 +104,16 @@ public class KubernetesCoordinator implements ClusterRegister, ClusterNodesQuery
             switch (event.getType()) {
                 case "ADDED":
                 case "MODIFIED":
-                    cache.put(event.getUid(), new RemoteInstance(new Address(event.getHost(), port, event.getUid().equals(this.uid))));
+                    if (port == -1) {
+                        portSetLock.lock();
+                        try {
+                            cache.put(event.getUid(), new RemoteInstance(new Address(event.getHost(), port, event.getUid().equals(this.uid))));
+                        } finally {
+                            portSetLock.unlock();
+                        }
+                    } else {
+                        cache.put(event.getUid(), new RemoteInstance(new Address(event.getHost(), port, event.getUid().equals(this.uid))));
+                    }
                     break;
                 case "DELETED":
                     cache.remove(event.getUid());
@@ -113,11 +126,17 @@ public class KubernetesCoordinator implements ClusterRegister, ClusterNodesQuery
 
     @Override public List<RemoteInstance> queryRemoteNodes() {
         if (port == -1) {
-            logger.debug("Query kubernetes remote, port hasn't init, try to init");
-            gRPCConfigService service = manager.find(CoreModule.NAME).provider().getService(gRPCConfigService.class);
-            port = service.getPort();
-            logger.debug("Query kubernetes remote, port is set at {}", port);
-            cache.values().forEach(instance -> instance.getAddress().setPort(port));
+            // Use lock mechanism to avoid concurrency conflict with `generateRemoteNodes`.
+            portSetLock.lock();
+            try {
+                logger.debug("Query kubernetes remote, port hasn't init, try to init");
+                gRPCConfigService service = manager.find(CoreModule.NAME).provider().getService(gRPCConfigService.class);
+                port = service.getPort();
+                logger.debug("Query kubernetes remote, port is set at {}", port);
+                cache.values().forEach(instance -> instance.getAddress().setPort(port));
+            } finally {
+                portSetLock.unlock();
+            }
         }
         logger.debug("Query kubernetes remote nodes: {}", cache);
         return Lists.newArrayList(cache.values());