You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2019/03/11 13:24:06 UTC
[incubator-skywalking] branch k8s-Coordinator-role updated: Fix
text cases.
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch k8s-Coordinator-role
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git
The following commit(s) were added to refs/heads/k8s-Coordinator-role by this push:
new e46134a Fix text cases.
e46134a is described below
commit e46134a12f328452a0b1678ea366dc80397ee04a
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Mon Mar 11 21:23:57 2019 +0800
Fix text cases.
---
.../cluster-kubernetes-plugin/pom.xml | 6 +++
.../plugin/kubernetes/KubernetesCoordinator.java | 6 +--
.../kubernetes/KubernetesCoordinatorTest.java | 51 ++++++++++++++++++++++
3 files changed, 60 insertions(+), 3 deletions(-)
diff --git a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/pom.xml b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/pom.xml
index 2eb1592..af12c92 100644
--- a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/pom.xml
+++ b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/pom.xml
@@ -38,5 +38,11 @@
<groupId>io.kubernetes</groupId>
<artifactId>client-java</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.skywalking</groupId>
+ <artifactId>server-testing</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java
index af11b06..7e7b456 100644
--- a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java
+++ b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java
@@ -29,7 +29,7 @@ import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.cluster.*;
import org.apache.skywalking.oap.server.core.config.ConfigService;
import org.apache.skywalking.oap.server.core.remote.client.Address;
-import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.telemetry.api.TelemetryRelatedContext;
import org.slf4j.*;
@@ -43,7 +43,7 @@ public class KubernetesCoordinator implements ClusterRegister, ClusterNodesQuery
private static final Logger logger = LoggerFactory.getLogger(KubernetesCoordinator.class);
- private final ModuleManager manager;
+ private final ModuleDefineHolder manager;
private final String uid;
@@ -55,7 +55,7 @@ public class KubernetesCoordinator implements ClusterRegister, ClusterNodesQuery
private volatile int port = -1;
- KubernetesCoordinator(ModuleManager manager,
+ KubernetesCoordinator(ModuleDefineHolder manager,
final ReusableWatch<Event> watch, final Supplier<String> uidSupplier) {
this.manager = manager;
this.watch = watch;
diff --git a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinatorTest.java b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinatorTest.java
index 78eb756..982f4eb 100644
--- a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinatorTest.java
+++ b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinatorTest.java
@@ -19,12 +19,18 @@
package org.apache.skywalking.oap.server.cluster.plugin.kubernetes;
import org.apache.skywalking.oap.server.cluster.plugin.kubernetes.fixture.PlainWatch;
+import org.apache.skywalking.oap.server.core.*;
import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
+import org.apache.skywalking.oap.server.core.config.ConfigService;
import org.apache.skywalking.oap.server.core.remote.client.Address;
+import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
+import org.apache.skywalking.oap.server.testing.module.*;
import org.junit.Test;
+import org.mockito.Mockito;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.when;
public class KubernetesCoordinatorTest {
@@ -34,6 +40,7 @@ public class KubernetesCoordinatorTest {
public void assertAdded() throws InterruptedException {
PlainWatch watch = PlainWatch.create(2, "ADDED", "1", "10.0.0.1", "ADDED", "2", "10.0.0.2");
coordinator = new KubernetesCoordinator(getManager(), watch, () -> "1");
+ coordinator.start();
coordinator.registerRemote(new RemoteInstance(new Address("0.0.0.0", 8454, true)));
watch.await();
assertThat(coordinator.queryRemoteNodes().size(), is(2));
@@ -44,6 +51,7 @@ public class KubernetesCoordinatorTest {
public void assertModified() throws InterruptedException {
PlainWatch watch = PlainWatch.create(3, "ADDED", "1", "10.0.0.1", "ADDED", "2", "10.0.0.2", "MODIFIED", "1", "10.0.0.3");
coordinator = new KubernetesCoordinator(getManager(), watch, () -> "1");
+ coordinator.start();
coordinator.registerRemote(new RemoteInstance(new Address("0.0.0.0", 8454, true)));
watch.await();
assertThat(coordinator.queryRemoteNodes().size(), is(2));
@@ -54,6 +62,7 @@ public class KubernetesCoordinatorTest {
public void assertDeleted() throws InterruptedException {
PlainWatch watch = PlainWatch.create(3, "ADDED", "1", "10.0.0.1", "ADDED", "2", "10.0.0.2", "DELETED", "2", "10.0.0.2");
coordinator = new KubernetesCoordinator(getManager(), watch, () -> "1");
+ coordinator.start();
coordinator.registerRemote(new RemoteInstance(new Address("0.0.0.0", 8454, true)));
watch.await();
assertThat(coordinator.queryRemoteNodes().size(), is(1));
@@ -64,9 +73,51 @@ public class KubernetesCoordinatorTest {
public void assertError() throws InterruptedException {
PlainWatch watch = PlainWatch.create(3, "ADDED", "1", "10.0.0.1", "ERROR", "X", "10.0.0.2", "ADDED", "2", "10.0.0.2");
coordinator = new KubernetesCoordinator(getManager(), watch, () -> "1");
+ coordinator.start();
coordinator.registerRemote(new RemoteInstance(new Address("0.0.0.0", 8454, true)));
watch.await();
assertThat(coordinator.queryRemoteNodes().size(), is(2));
assertThat(coordinator.queryRemoteNodes().stream().filter(instance -> instance.getAddress().isSelf()).findFirst().get().getAddress().getHost(), is("10.0.0.1"));
}
+
+ @Test
+ public void assertModifiedInReceiverRole() throws InterruptedException {
+ PlainWatch watch = PlainWatch.create(3, "ADDED", "1", "10.0.0.1", "ADDED", "2", "10.0.0.2", "MODIFIED", "1", "10.0.0.3");
+ coordinator = new KubernetesCoordinator(getManager(), watch, () -> "1");
+ coordinator.start();
+ watch.await();
+ assertThat(coordinator.queryRemoteNodes().size(), is(2));
+ assertThat(coordinator.queryRemoteNodes().stream().filter(instance -> instance.getAddress().isSelf()).findFirst().get().getAddress().getHost(), is("10.0.0.3"));
+ }
+
+ @Test
+ public void assertDeletedInReceiverRole() throws InterruptedException {
+ PlainWatch watch = PlainWatch.create(3, "ADDED", "1", "10.0.0.1", "ADDED", "2", "10.0.0.2", "DELETED", "2", "10.0.0.2");
+ coordinator = new KubernetesCoordinator(getManager(), watch, () -> "1");
+ coordinator.start();
+ watch.await();
+ assertThat(coordinator.queryRemoteNodes().size(), is(1));
+ assertThat(coordinator.queryRemoteNodes().stream().filter(instance -> instance.getAddress().isSelf()).findFirst().get().getAddress().getHost(), is("10.0.0.1"));
+ }
+
+ @Test
+ public void assertErrorInReceiverRole() throws InterruptedException {
+ PlainWatch watch = PlainWatch.create(3, "ADDED", "1", "10.0.0.1", "ERROR", "X", "10.0.0.2", "ADDED", "2", "10.0.0.2");
+ coordinator = new KubernetesCoordinator(getManager(), watch, () -> "1");
+ coordinator.start();
+ watch.await();
+ assertThat(coordinator.queryRemoteNodes().size(), is(2));
+ assertThat(coordinator.queryRemoteNodes().stream().filter(instance -> instance.getAddress().isSelf()).findFirst().get().getAddress().getHost(), is("10.0.0.1"));
+ }
+
+ public ModuleDefineHolder getManager() {
+ ModuleManagerTesting moduleManagerTesting = new ModuleManagerTesting();
+ ModuleDefineTesting coreModuleDefine = new ModuleDefineTesting();
+ moduleManagerTesting.put(CoreModule.NAME, coreModuleDefine);
+ CoreModuleConfig config = Mockito.mock(CoreModuleConfig.class);
+ when(config.getGRPCHost()).thenReturn("127.0.0.1");
+ when(config.getGRPCPort()).thenReturn(8454);
+ coreModuleDefine.provider().registerServiceImplementation(ConfigService.class, new ConfigService(config));
+ return moduleManagerTesting;
+ }
}
\ No newline at end of file