You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2019/03/11 12:52:56 UTC

[incubator-skywalking] branch k8s-Coordinator-role created (now 8c8f39d)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a change to branch k8s-Coordinator-role
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git.


      at 8c8f39d  Make k8s-Coordinator back in role mode.

This branch includes the following new commits:

     new 8c8f39d  Make k8s-Coordinator back in role mode.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-skywalking] 01/01: Make k8s-Coordinator back in role mode.

Posted by wu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch k8s-Coordinator-role
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git

commit 8c8f39d0f5c5f347b628ff55ca368f6606acec0d
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Mon Mar 11 20:52:44 2019 +0800

    Make k8s-Coordinator back in role mode.
---
 .../ClusterModuleKubernetesProvider.java           |  2 +-
 .../plugin/kubernetes/KubernetesCoordinator.java   | 26 ++++++++++++----
 .../kubernetes/KubernetesCoordinatorTest.java      |  8 ++---
 .../oap/server/core/CoreModuleConfig.java          |  2 +-
 .../oap/server/core/CoreModuleProvider.java        |  3 +-
 .../oap/server/core/config/gRPCConfigService.java  | 36 ++++++++++++++++++++++
 .../oap/server/core/remote/client/Address.java     |  4 +--
 7 files changed, 66 insertions(+), 15 deletions(-)

diff --git a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/ClusterModuleKubernetesProvider.java b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/ClusterModuleKubernetesProvider.java
index f1d0d07..4545e84 100644
--- a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/ClusterModuleKubernetesProvider.java
+++ b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/ClusterModuleKubernetesProvider.java
@@ -53,7 +53,7 @@ public class ClusterModuleKubernetesProvider extends ModuleProvider {
     }
 
     @Override public void prepare() throws ServiceNotProvidedException {
-        KubernetesCoordinator coordinator = new KubernetesCoordinator(
+        KubernetesCoordinator coordinator = new KubernetesCoordinator(getManager(),
             new NamespacedPodListWatch(config.getNamespace(), config.getLabelSelector(), config.getWatchTimeoutSeconds()),
             new UidEnvSupplier(config.getUidEnvName()));
         this.registerServiceImplementation(ClusterRegister.class, coordinator);
diff --git a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java
index 730f4b2..3d6dee5 100644
--- a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java
+++ b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java
@@ -24,14 +24,17 @@ import java.util.*;
 import java.util.concurrent.*;
 import java.util.function.Supplier;
 import javax.annotation.Nullable;
+import org.apache.skywalking.oap.server.core.CoreModule;
 import org.apache.skywalking.oap.server.core.cluster.*;
+import org.apache.skywalking.oap.server.core.config.gRPCConfigService;
 import org.apache.skywalking.oap.server.core.remote.client.Address;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
 import org.apache.skywalking.oap.server.telemetry.api.TelemetryRelatedContext;
 import org.slf4j.*;
 
 /**
- * Read collector pod info from api-server of kubernetes, then using all containerIp list to
- * construct the list of {@link RemoteInstance}.
+ * Read collector pod info from api-server of kubernetes, then using all containerIp list to construct the list of
+ * {@link RemoteInstance}.
  *
  * @author gaohongtao
  */
@@ -39,24 +42,28 @@ public class KubernetesCoordinator implements ClusterRegister, ClusterNodesQuery
 
     private static final Logger logger = LoggerFactory.getLogger(KubernetesCoordinator.class);
 
+    private final ModuleManager manager;
+
     private final String uid;
 
     private final Map<String, RemoteInstance> cache = new ConcurrentHashMap<>();
 
     private final ReusableWatch<Event> watch;
 
-    private int port;
+    private volatile int port = -1;
 
-    KubernetesCoordinator(final ReusableWatch<Event> watch, final Supplier<String> uidSupplier) {
+    KubernetesCoordinator(ModuleManager manager,
+        final ReusableWatch<Event> watch, final Supplier<String> uidSupplier) {
+        this.manager = manager;
         this.watch = watch;
         this.uid = uidSupplier.get();
         TelemetryRelatedContext.INSTANCE.setId(uid);
+        submitTask(MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
+            .setDaemon(true).setNameFormat("Kubernetes-ApiServer-%s").build())));
     }
 
     @Override public void registerRemote(RemoteInstance remoteInstance) throws ServiceRegisterException {
         this.port = remoteInstance.getAddress().getPort();
-        submitTask(MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
-            .setDaemon(true).setNameFormat("Kubernetes-ApiServer-%s").build())));
     }
 
     private void submitTask(final ListeningExecutorService service) {
@@ -102,6 +109,13 @@ public class KubernetesCoordinator implements ClusterRegister, ClusterNodesQuery
     }
 
     @Override public List<RemoteInstance> queryRemoteNodes() {
+        if (port == -1) {
+            logger.debug("Query kubernetes remote, port hasn't init, try to init");
+            gRPCConfigService service = manager.find(CoreModule.NAME).provider().getService(gRPCConfigService.class);
+            port = service.getPort();
+            logger.debug("Query kubernetes remote, port is set at {}", port);
+            cache.values().forEach(instance -> instance.getAddress().setPort(port));
+        }
         logger.debug("Query kubernetes remote nodes: {}", cache);
         return Lists.newArrayList(cache.values());
     }
diff --git a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinatorTest.java b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinatorTest.java
index 4fd2699..78eb756 100644
--- a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinatorTest.java
+++ b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinatorTest.java
@@ -33,7 +33,7 @@ public class KubernetesCoordinatorTest {
     @Test
     public void assertAdded() throws InterruptedException {
         PlainWatch watch = PlainWatch.create(2, "ADDED", "1", "10.0.0.1", "ADDED", "2", "10.0.0.2");
-        coordinator = new KubernetesCoordinator(watch, () -> "1");
+        coordinator = new KubernetesCoordinator(getManager(), watch, () -> "1");
         coordinator.registerRemote(new RemoteInstance(new Address("0.0.0.0", 8454, true)));
         watch.await();
         assertThat(coordinator.queryRemoteNodes().size(), is(2));
@@ -43,7 +43,7 @@ public class KubernetesCoordinatorTest {
     @Test
     public void assertModified() throws InterruptedException {
         PlainWatch watch = PlainWatch.create(3, "ADDED", "1", "10.0.0.1", "ADDED", "2", "10.0.0.2", "MODIFIED", "1", "10.0.0.3");
-        coordinator = new KubernetesCoordinator(watch, () -> "1");
+        coordinator = new KubernetesCoordinator(getManager(), watch, () -> "1");
         coordinator.registerRemote(new RemoteInstance(new Address("0.0.0.0", 8454, true)));
         watch.await();
         assertThat(coordinator.queryRemoteNodes().size(), is(2));
@@ -53,7 +53,7 @@ public class KubernetesCoordinatorTest {
     @Test
     public void assertDeleted() throws InterruptedException {
         PlainWatch watch = PlainWatch.create(3, "ADDED", "1", "10.0.0.1", "ADDED", "2", "10.0.0.2", "DELETED", "2", "10.0.0.2");
-        coordinator = new KubernetesCoordinator(watch, () -> "1");
+        coordinator = new KubernetesCoordinator(getManager(), watch, () -> "1");
         coordinator.registerRemote(new RemoteInstance(new Address("0.0.0.0", 8454, true)));
         watch.await();
         assertThat(coordinator.queryRemoteNodes().size(), is(1));
@@ -63,7 +63,7 @@ public class KubernetesCoordinatorTest {
     @Test
     public void assertError() throws InterruptedException {
         PlainWatch watch = PlainWatch.create(3, "ADDED", "1", "10.0.0.1", "ERROR", "X", "10.0.0.2", "ADDED", "2", "10.0.0.2");
-        coordinator = new KubernetesCoordinator(watch, () -> "1");
+        coordinator = new KubernetesCoordinator(getManager(), watch, () -> "1");
         coordinator.registerRemote(new RemoteInstance(new Address("0.0.0.0", 8454, true)));
         watch.await();
         assertThat(coordinator.queryRemoteNodes().size(), is(2));
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
index 35e3b49..31b59cd 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
@@ -27,7 +27,7 @@ import org.apache.skywalking.oap.server.library.module.ModuleConfig;
  */
 @Getter
 public class CoreModuleConfig extends ModuleConfig {
-    @Setter private String role;
+    @Setter private String role = "Mixed";
     @Setter private String nameSpace;
     @Setter private String restHost;
     @Setter private int restPort;
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
index 52063c0..222afea 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
@@ -106,6 +106,7 @@ public class CoreModuleProvider extends ModuleProvider {
         jettyServer = new JettyServer(moduleConfig.getRestHost(), moduleConfig.getRestPort(), moduleConfig.getRestContextPath(), moduleConfig.getJettySelectors());
         jettyServer.initialize();
 
+        this.registerServiceImplementation(gRPCConfigService.class, new gRPCConfigService(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort()));
         this.registerServiceImplementation(DownsamplingConfigService.class, new DownsamplingConfigService(moduleConfig.getDownsampling()));
 
         this.registerServiceImplementation(GRPCHandlerRegister.class, new GRPCHandlerRegisterImpl(grpcServer));
@@ -176,7 +177,7 @@ public class CoreModuleProvider extends ModuleProvider {
             throw new ModuleStartException(e.getMessage(), e);
         }
 
-        if (CoreModuleConfig.Role.Mixed.name().equals(moduleConfig.getRole()) || CoreModuleConfig.Role.Aggregator.name().equals(moduleConfig.getRole())) {
+        if (CoreModuleConfig.Role.Mixed.name().equalsIgnoreCase(moduleConfig.getRole()) || CoreModuleConfig.Role.Aggregator.name().equalsIgnoreCase(moduleConfig.getRole())) {
             RemoteInstance gRPCServerInstance = new RemoteInstance(new Address(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort(), true));
             this.getManager().find(ClusterModule.NAME).provider().getService(ClusterRegister.class).registerRemote(gRPCServerInstance);
         }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/config/gRPCConfigService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/config/gRPCConfigService.java
new file mode 100644
index 0000000..d5a1d15
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/config/gRPCConfigService.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.config;
+
+import lombok.Getter;
+import org.apache.skywalking.oap.server.library.module.Service;
+
+/**
+ * @author wusheng
+ */
+@Getter
+public class gRPCConfigService implements Service {
+    private String host;
+    private int port;
+
+    public gRPCConfigService(String host, int port) {
+        this.host = host;
+        this.port = port;
+    }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/Address.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/Address.java
index 4e5d0ce..4c4b8da 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/Address.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/Address.java
@@ -27,8 +27,8 @@ import org.apache.skywalking.oap.server.core.Const;
 @Getter
 public class Address implements Comparable<Address> {
     private final String host;
-    private final int port;
-    @Setter private boolean isSelf;
+    @Setter private volatile int port;
+    @Setter private volatile boolean isSelf;
 
     public Address(String host, int port, boolean isSelf) {
         this.host = host;