You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2019/03/11 13:24:06 UTC

[incubator-skywalking] branch k8s-Coordinator-role updated: Fix text cases.

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch k8s-Coordinator-role
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git


The following commit(s) were added to refs/heads/k8s-Coordinator-role by this push:
     new e46134a  Fix text cases.
e46134a is described below

commit e46134a12f328452a0b1678ea366dc80397ee04a
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Mon Mar 11 21:23:57 2019 +0800

    Fix text cases.
---
 .../cluster-kubernetes-plugin/pom.xml              |  6 +++
 .../plugin/kubernetes/KubernetesCoordinator.java   |  6 +--
 .../kubernetes/KubernetesCoordinatorTest.java      | 51 ++++++++++++++++++++++
 3 files changed, 60 insertions(+), 3 deletions(-)

diff --git a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/pom.xml b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/pom.xml
index 2eb1592..af12c92 100644
--- a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/pom.xml
+++ b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/pom.xml
@@ -38,5 +38,11 @@
             <groupId>io.kubernetes</groupId>
             <artifactId>client-java</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.skywalking</groupId>
+            <artifactId>server-testing</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>
\ No newline at end of file
diff --git a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java
index af11b06..7e7b456 100644
--- a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java
+++ b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java
@@ -29,7 +29,7 @@ import org.apache.skywalking.oap.server.core.CoreModule;
 import org.apache.skywalking.oap.server.core.cluster.*;
 import org.apache.skywalking.oap.server.core.config.ConfigService;
 import org.apache.skywalking.oap.server.core.remote.client.Address;
-import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
 import org.apache.skywalking.oap.server.telemetry.api.TelemetryRelatedContext;
 import org.slf4j.*;
 
@@ -43,7 +43,7 @@ public class KubernetesCoordinator implements ClusterRegister, ClusterNodesQuery
 
     private static final Logger logger = LoggerFactory.getLogger(KubernetesCoordinator.class);
 
-    private final ModuleManager manager;
+    private final ModuleDefineHolder manager;
 
     private final String uid;
 
@@ -55,7 +55,7 @@ public class KubernetesCoordinator implements ClusterRegister, ClusterNodesQuery
 
     private volatile int port = -1;
 
-    KubernetesCoordinator(ModuleManager manager,
+    KubernetesCoordinator(ModuleDefineHolder manager,
         final ReusableWatch<Event> watch, final Supplier<String> uidSupplier) {
         this.manager = manager;
         this.watch = watch;
diff --git a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinatorTest.java b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinatorTest.java
index 78eb756..982f4eb 100644
--- a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinatorTest.java
+++ b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinatorTest.java
@@ -19,12 +19,18 @@
 package org.apache.skywalking.oap.server.cluster.plugin.kubernetes;
 
 import org.apache.skywalking.oap.server.cluster.plugin.kubernetes.fixture.PlainWatch;
+import org.apache.skywalking.oap.server.core.*;
 import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
+import org.apache.skywalking.oap.server.core.config.ConfigService;
 import org.apache.skywalking.oap.server.core.remote.client.Address;
+import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
+import org.apache.skywalking.oap.server.testing.module.*;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.when;
 
 public class KubernetesCoordinatorTest {
 
@@ -34,6 +40,7 @@ public class KubernetesCoordinatorTest {
     public void assertAdded() throws InterruptedException {
         PlainWatch watch = PlainWatch.create(2, "ADDED", "1", "10.0.0.1", "ADDED", "2", "10.0.0.2");
         coordinator = new KubernetesCoordinator(getManager(), watch, () -> "1");
+        coordinator.start();
         coordinator.registerRemote(new RemoteInstance(new Address("0.0.0.0", 8454, true)));
         watch.await();
         assertThat(coordinator.queryRemoteNodes().size(), is(2));
@@ -44,6 +51,7 @@ public class KubernetesCoordinatorTest {
     public void assertModified() throws InterruptedException {
         PlainWatch watch = PlainWatch.create(3, "ADDED", "1", "10.0.0.1", "ADDED", "2", "10.0.0.2", "MODIFIED", "1", "10.0.0.3");
         coordinator = new KubernetesCoordinator(getManager(), watch, () -> "1");
+        coordinator.start();
         coordinator.registerRemote(new RemoteInstance(new Address("0.0.0.0", 8454, true)));
         watch.await();
         assertThat(coordinator.queryRemoteNodes().size(), is(2));
@@ -54,6 +62,7 @@ public class KubernetesCoordinatorTest {
     public void assertDeleted() throws InterruptedException {
         PlainWatch watch = PlainWatch.create(3, "ADDED", "1", "10.0.0.1", "ADDED", "2", "10.0.0.2", "DELETED", "2", "10.0.0.2");
         coordinator = new KubernetesCoordinator(getManager(), watch, () -> "1");
+        coordinator.start();
         coordinator.registerRemote(new RemoteInstance(new Address("0.0.0.0", 8454, true)));
         watch.await();
         assertThat(coordinator.queryRemoteNodes().size(), is(1));
@@ -64,9 +73,51 @@ public class KubernetesCoordinatorTest {
     public void assertError() throws InterruptedException {
         PlainWatch watch = PlainWatch.create(3, "ADDED", "1", "10.0.0.1", "ERROR", "X", "10.0.0.2", "ADDED", "2", "10.0.0.2");
         coordinator = new KubernetesCoordinator(getManager(), watch, () -> "1");
+        coordinator.start();
         coordinator.registerRemote(new RemoteInstance(new Address("0.0.0.0", 8454, true)));
         watch.await();
         assertThat(coordinator.queryRemoteNodes().size(), is(2));
         assertThat(coordinator.queryRemoteNodes().stream().filter(instance -> instance.getAddress().isSelf()).findFirst().get().getAddress().getHost(), is("10.0.0.1"));
     }
+
+    @Test
+    public void assertModifiedInReceiverRole() throws InterruptedException {
+        PlainWatch watch = PlainWatch.create(3, "ADDED", "1", "10.0.0.1", "ADDED", "2", "10.0.0.2", "MODIFIED", "1", "10.0.0.3");
+        coordinator = new KubernetesCoordinator(getManager(), watch, () -> "1");
+        coordinator.start();
+        watch.await();
+        assertThat(coordinator.queryRemoteNodes().size(), is(2));
+        assertThat(coordinator.queryRemoteNodes().stream().filter(instance -> instance.getAddress().isSelf()).findFirst().get().getAddress().getHost(), is("10.0.0.3"));
+    }
+
+    @Test
+    public void assertDeletedInReceiverRole() throws InterruptedException {
+        PlainWatch watch = PlainWatch.create(3, "ADDED", "1", "10.0.0.1", "ADDED", "2", "10.0.0.2", "DELETED", "2", "10.0.0.2");
+        coordinator = new KubernetesCoordinator(getManager(), watch, () -> "1");
+        coordinator.start();
+        watch.await();
+        assertThat(coordinator.queryRemoteNodes().size(), is(1));
+        assertThat(coordinator.queryRemoteNodes().stream().filter(instance -> instance.getAddress().isSelf()).findFirst().get().getAddress().getHost(), is("10.0.0.1"));
+    }
+
+    @Test
+    public void assertErrorInReceiverRole() throws InterruptedException {
+        PlainWatch watch = PlainWatch.create(3, "ADDED", "1", "10.0.0.1", "ERROR", "X", "10.0.0.2", "ADDED", "2", "10.0.0.2");
+        coordinator = new KubernetesCoordinator(getManager(), watch, () -> "1");
+        coordinator.start();
+        watch.await();
+        assertThat(coordinator.queryRemoteNodes().size(), is(2));
+        assertThat(coordinator.queryRemoteNodes().stream().filter(instance -> instance.getAddress().isSelf()).findFirst().get().getAddress().getHost(), is("10.0.0.1"));
+    }
+
+    public ModuleDefineHolder getManager() {
+        ModuleManagerTesting moduleManagerTesting = new ModuleManagerTesting();
+        ModuleDefineTesting coreModuleDefine = new ModuleDefineTesting();
+        moduleManagerTesting.put(CoreModule.NAME, coreModuleDefine);
+        CoreModuleConfig config = Mockito.mock(CoreModuleConfig.class);
+        when(config.getGRPCHost()).thenReturn("127.0.0.1");
+        when(config.getGRPCPort()).thenReturn(8454);
+        coreModuleDefine.provider().registerServiceImplementation(ConfigService.class, new ConfigService(config));
+        return moduleManagerTesting;
+    }
 }
\ No newline at end of file