You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2019/05/06 01:11:23 UTC
[skywalking] branch master updated: 1. Add unit tests for
cluster-consul-plugin; (#2572)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new a4eb10d 1. Add unit tests for cluster-consul-plugin; (#2572)
a4eb10d is described below
commit a4eb10d1dac20be1cd6fcf734a3e6817530c00c4
Author: Ming Deng <mi...@qq.com>
AuthorDate: Mon May 6 09:11:15 2019 +0800
1. Add unit tests for cluster-consul-plugin; (#2572)
2. Add unit tests for cluster-zookeeper-plugin;
3. Add unit tests for cluster-kubernetes-plugin;
---
.../plugin/consul/ClusterModuleConsulProvider.java | 11 +-
.../consul/ClusterModuleConsulProviderTest.java | 142 +++++++++++++++
.../plugin/consul/ConsulCoordinatorTest.java | 201 +++++++++++++++++++++
.../dependencies/NamespacedPodListWatchTest.java | 166 +++++++++++++++++
.../plugin/zookeeper/NodeNameBuilderTest.java | 36 ++++
.../plugin/zookeeper/ZookeeperCoordinatorTest.java | 101 +++++++++++
6 files changed, 654 insertions(+), 3 deletions(-)
diff --git a/oap-server/server-cluster-plugin/cluster-consul-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/consul/ClusterModuleConsulProvider.java b/oap-server/server-cluster-plugin/cluster-consul-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/consul/ClusterModuleConsulProvider.java
index 61458bb..d6fd2cc 100644
--- a/oap-server/server-cluster-plugin/cluster-consul-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/consul/ClusterModuleConsulProvider.java
+++ b/oap-server/server-cluster-plugin/cluster-consul-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/consul/ClusterModuleConsulProvider.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.oap.server.cluster.plugin.consul;
import com.google.common.net.HostAndPort;
import com.orbitz.consul.Consul;
+import com.orbitz.consul.ConsulException;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
@@ -71,12 +72,16 @@ public class ClusterModuleConsulProvider extends ModuleProvider {
hostAndPorts.add(HostAndPort.fromParts(address.getHost(), address.getPort()));
}
+ Consul.Builder consulBuilder = Consul.builder()
+// we should set this value or it will be blocked forever
+ .withConnectTimeoutMillis(3000);
+
if (hostAndPorts.size() > 1) {
- client = Consul.builder().withMultipleHostAndPort(hostAndPorts, 5000).build();
+ client = consulBuilder.withMultipleHostAndPort(hostAndPorts, 5000).build();
} else {
- client = Consul.builder().withHostAndPort(hostAndPorts.get(0)).build();
+ client = consulBuilder.withHostAndPort(hostAndPorts.get(0)).build();
}
- } catch (ConnectStringParseException e) {
+ } catch (ConnectStringParseException | ConsulException e) {
throw new ModuleStartException(e.getMessage(), e);
}
diff --git a/oap-server/server-cluster-plugin/cluster-consul-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/consul/ClusterModuleConsulProviderTest.java b/oap-server/server-cluster-plugin/cluster-consul-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/consul/ClusterModuleConsulProviderTest.java
new file mode 100644
index 0000000..8a14d43
--- /dev/null
+++ b/oap-server/server-cluster-plugin/cluster-consul-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/consul/ClusterModuleConsulProviderTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.skywalking.oap.server.cluster.plugin.consul;
+
+import com.google.common.collect.Lists;
+import com.google.common.net.HostAndPort;
+import com.orbitz.consul.Consul;
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
+import org.apache.skywalking.oap.server.library.module.ModuleConfig;
+import org.apache.skywalking.oap.server.library.module.ModuleStartException;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.reflect.Whitebox;
+
+import java.util.Collection;
+import java.util.List;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+/**
+ * Created by dengming, 2019.05.01
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(Consul.class)
+@PowerMockIgnore("javax.management.*")
+public class ClusterModuleConsulProviderTest {
+
+ private ClusterModuleConsulProvider provider = new ClusterModuleConsulProvider();
+
+ @Test
+ public void name() {
+ assertEquals("consul", provider.name());
+ }
+
+ @Test
+ public void module() {
+ assertEquals(ClusterModule.class, provider.module());
+ }
+
+ @Test
+ public void createConfigBeanIfAbsent() {
+ ModuleConfig moduleConfig = provider.createConfigBeanIfAbsent();
+ assertTrue(moduleConfig instanceof ClusterModuleConsulConfig);
+ }
+
+ @Test(expected = ModuleStartException.class)
+ public void prepareWithNonHost() throws Exception {
+ provider.prepare();
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void prepare() throws Exception {
+ ClusterModuleConsulConfig consulConfig = new ClusterModuleConsulConfig();
+ consulConfig.setHostPort("10.0.0.1:1000,10.0.0.2:1001");
+ Whitebox.setInternalState(provider, "config", consulConfig);
+
+ Consul consulClient = mock(Consul.class);
+ Consul.Builder builder = mock(Consul.Builder.class);
+ when(builder.build()).thenReturn(consulClient);
+
+ PowerMockito.mockStatic(Consul.class);
+ when(Consul.builder()).thenReturn(builder);
+ when(builder.withConnectTimeoutMillis(anyLong())).thenReturn(builder);
+
+ when(builder.withMultipleHostAndPort(anyCollection(), anyLong())).thenReturn(builder);
+ provider.prepare();
+
+ ArgumentCaptor<Collection> addressCaptor = ArgumentCaptor.forClass(Collection.class);
+ ArgumentCaptor<Long> timeCaptor = ArgumentCaptor.forClass(long.class);
+ verify(builder).withMultipleHostAndPort(addressCaptor.capture(), timeCaptor.capture());
+
+ List<HostAndPort> address = (List<HostAndPort>) addressCaptor.getValue();
+ assertEquals(2, address.size());
+ assertEquals(Lists.newArrayList(HostAndPort.fromParts("10.0.0.1", 1000),
+ HostAndPort.fromParts("10.0.0.2", 1001)
+ ), address);
+ }
+
+ @Test
+ public void prepareSingle() throws Exception {
+ ClusterModuleConsulConfig consulConfig = new ClusterModuleConsulConfig();
+ consulConfig.setHostPort("10.0.0.1:1000");
+ Whitebox.setInternalState(provider, "config", consulConfig);
+
+ Consul consulClient = mock(Consul.class);
+ Consul.Builder builder = mock(Consul.Builder.class);
+ when(builder.build()).thenReturn(consulClient);
+
+ PowerMockito.mockStatic(Consul.class);
+ when(Consul.builder()).thenReturn(builder);
+ when(builder.withConnectTimeoutMillis(anyLong())).thenCallRealMethod();
+
+ when(builder.withHostAndPort(any())).thenReturn(builder);
+
+ provider.prepare();
+
+ ArgumentCaptor<HostAndPort> hostAndPortArgumentCaptor = ArgumentCaptor.forClass(HostAndPort.class);
+ verify(builder).withHostAndPort(hostAndPortArgumentCaptor.capture());
+
+ HostAndPort address = hostAndPortArgumentCaptor.getValue();
+ assertEquals(HostAndPort.fromParts("10.0.0.1", 1000), address);
+ }
+
+ @Test
+ public void start() {
+ provider.start();
+ }
+
+ @Test
+ public void notifyAfterCompleted() {
+ provider.notifyAfterCompleted();
+ }
+
+ @Test
+ public void requiredModules() {
+ String[] modules = provider.requiredModules();
+ assertArrayEquals(new String[]{CoreModule.NAME}, modules);
+ }
+}
\ No newline at end of file
diff --git a/oap-server/server-cluster-plugin/cluster-consul-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/consul/ConsulCoordinatorTest.java b/oap-server/server-cluster-plugin/cluster-consul-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/consul/ConsulCoordinatorTest.java
new file mode 100644
index 0000000..181e003
--- /dev/null
+++ b/oap-server/server-cluster-plugin/cluster-consul-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/consul/ConsulCoordinatorTest.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.skywalking.oap.server.cluster.plugin.consul;
+
+import com.orbitz.consul.AgentClient;
+import com.orbitz.consul.Consul;
+import com.orbitz.consul.HealthClient;
+import com.orbitz.consul.model.ConsulResponse;
+import com.orbitz.consul.model.agent.Registration;
+import com.orbitz.consul.model.health.Service;
+import com.orbitz.consul.model.health.ServiceHealth;
+import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
+import org.apache.skywalking.oap.server.core.remote.client.Address;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+/**
+ * Created by dengming, 2019.05.02
+ */
+public class ConsulCoordinatorTest {
+
+ private Consul consul = mock(Consul.class);
+
+ private ClusterModuleConsulConfig consulConfig = new ClusterModuleConsulConfig();
+
+ private ConsulCoordinator coordinator;
+
+ private ConsulResponse<List<ServiceHealth>> consulResponse;
+
+ private Address remoteAddress = new Address("10.0.0.1", 1000, false);
+ private Address selfRemoteAddress = new Address("10.0.0.2", 1001, true);
+
+ private Address internalAddress = new Address("10.0.0.3", 1002, false);
+
+ private AgentClient agentClient = mock(AgentClient.class);
+
+ private static final String SERVICE_NAME = "my-service";
+
+ @Before
+ public void setUp() {
+
+ consulConfig.setServiceName(SERVICE_NAME);
+
+ coordinator = new ConsulCoordinator(consulConfig, consul);
+
+ consulResponse = mock(ConsulResponse.class);
+
+ HealthClient healthClient = mock(HealthClient.class);
+ when(healthClient.getHealthyServiceInstances(anyString())).thenReturn(consulResponse);
+
+ when(consul.healthClient()).thenReturn(healthClient);
+ when(consul.agentClient()).thenReturn(agentClient);
+ }
+
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void queryRemoteNodesWithNonOrEmpty() {
+ when(consulResponse.getResponse()).thenReturn(null, Collections.emptyList());
+ assertEquals(0, coordinator.queryRemoteNodes().size());
+ assertEquals(0, coordinator.queryRemoteNodes().size());
+ }
+
+ @Test
+ public void queryRemoteNodes() {
+ registerSelfRemote();
+ List<ServiceHealth> serviceHealths = mockHealth();
+ when(consulResponse.getResponse()).thenReturn(serviceHealths);
+ List<RemoteInstance> remoteInstances = coordinator.queryRemoteNodes();
+ assertEquals(2, remoteInstances.size());
+
+ RemoteInstance selfInstance = remoteInstances.get(0);
+ velidate(selfRemoteAddress, selfInstance);
+
+ RemoteInstance notSelfInstance = remoteInstances.get(1);
+ velidate(remoteAddress, notSelfInstance);
+ }
+
+ @Test
+ public void queryRemoteNodesWithNullSelf() {
+ List<ServiceHealth> serviceHealths = mockHealth();
+ when(consulResponse.getResponse()).thenReturn(serviceHealths);
+ List<RemoteInstance> remoteInstances = coordinator.queryRemoteNodes();
+ assertTrue(remoteInstances.isEmpty());
+ }
+
+ @Test
+ public void registerRemote() {
+ registerRemote(remoteAddress);
+ }
+
+ @Test
+ public void registerSelfRemote() {
+ registerRemote(selfRemoteAddress);
+ }
+
+ @Test
+ public void registerRemoteUsingInternal() {
+ consulConfig.setInternalComHost(internalAddress.getHost());
+ consulConfig.setInternalComPort(internalAddress.getPort());
+ registerRemote(internalAddress);
+ }
+
+ private void velidate(Address originArress, RemoteInstance instance) {
+ Address instanceAddress = instance.getAddress();
+ assertEquals(originArress.getHost(), instanceAddress.getHost());
+ assertEquals(originArress.getPort(), instanceAddress.getPort());
+ }
+
+ private void registerRemote(Address address) {
+ coordinator.registerRemote(new RemoteInstance(address));
+ Registration registration = afterRegister();
+ verifyRegistration(address, registration);
+ }
+
+ private Registration afterRegister() {
+ ArgumentCaptor<Registration> argumentCaptor = ArgumentCaptor.forClass(Registration.class);
+ verify(agentClient).register(argumentCaptor.capture());
+ return argumentCaptor.getValue();
+ }
+
+ private void verifyRegistration(Address remoteAddress, Registration registration) {
+ assertNotNull(registration);
+ assertEquals(SERVICE_NAME, registration.getName());
+ assertEquals(remoteAddress.getHost() + "_" + remoteAddress.getPort(), registration.getId());
+ assertTrue(registration.getAddress().isPresent());
+ assertEquals(remoteAddress.getHost(), registration.getAddress().get());
+ assertTrue(registration.getPort().isPresent());
+ assertEquals(remoteAddress.getPort(), registration.getPort().get().intValue());
+ assertTrue(registration.getCheck().isPresent());
+ Registration.RegCheck regCheck = registration.getCheck().get();
+ assertTrue(regCheck.getGrpc().isPresent());
+ assertEquals(remoteAddress.getHost() + ":" + remoteAddress.getPort(), regCheck.getGrpc().get());
+ }
+
+ private List<ServiceHealth> mockHealth() {
+ List<ServiceHealth> result = new LinkedList<>();
+ result.add(mockSelfService());
+ result.add(mockNotSelfService());
+ result.add(mockNullServiceAddress());
+ return result;
+ }
+
+ private ServiceHealth mockNotSelfService() {
+ ServiceHealth serviceHealth = mock(ServiceHealth.class);
+ Service service = mock(Service.class);
+
+ when(service.getAddress()).thenReturn(remoteAddress.getHost());
+ when(service.getPort()).thenReturn(remoteAddress.getPort());
+
+ when(serviceHealth.getService()).thenReturn(service);
+
+ return serviceHealth;
+ }
+
+ private ServiceHealth mockSelfService() {
+ ServiceHealth serviceHealth = mock(ServiceHealth.class);
+ Service service = mock(Service.class);
+
+ when(service.getAddress()).thenReturn(selfRemoteAddress.getHost());
+ when(service.getPort()).thenReturn(selfRemoteAddress.getPort());
+
+ when(serviceHealth.getService()).thenReturn(service);
+
+ return serviceHealth;
+ }
+
+ private ServiceHealth mockNullServiceAddress() {
+ ServiceHealth serviceHealth = mock(ServiceHealth.class);
+ Service service = mock(Service.class);
+
+ when(serviceHealth.getService()).thenReturn(service);
+
+ when(service.getAddress()).thenReturn("");
+
+ return serviceHealth;
+ }
+}
\ No newline at end of file
diff --git a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/dependencies/NamespacedPodListWatchTest.java b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/dependencies/NamespacedPodListWatchTest.java
new file mode 100644
index 0000000..71d8b5d
--- /dev/null
+++ b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/dependencies/NamespacedPodListWatchTest.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.skywalking.oap.server.cluster.plugin.kubernetes.dependencies;
+
+import com.squareup.okhttp.Call;
+import com.squareup.okhttp.OkHttpClient;
+import com.squareup.okhttp.Request;
+import io.kubernetes.client.ApiClient;
+import io.kubernetes.client.models.V1ObjectMeta;
+import io.kubernetes.client.models.V1Pod;
+import io.kubernetes.client.models.V1PodStatus;
+import io.kubernetes.client.util.Watch;
+import org.apache.skywalking.oap.server.cluster.plugin.kubernetes.Event;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.reflect.Whitebox;
+
+import java.lang.reflect.Type;
+import java.util.Iterator;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+/**
+ * Created by dengming, 2019.05.02
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({Watch.class, OkHttpClient.class})
+@PowerMockIgnore("javax.management.*")
+public class NamespacedPodListWatchTest {
+
+ private NamespacedPodListWatch namespacedPodListWatch;
+
+ private Watch mockWatch = mock(Watch.class);
+
+ private static final String NAME_SPACE = "my-namespace";
+ private static final String LABEL_SELECTOR = "equality-based";
+ private static final String RESPONSE_TYPE = "my-type";
+ private static final int WATCH_TIMEOUT_SECONDS = 3;
+
+
+ @Before
+ public void setUp() throws Exception {
+
+ namespacedPodListWatch = new NamespacedPodListWatch(NAME_SPACE, LABEL_SELECTOR, WATCH_TIMEOUT_SECONDS);
+
+ PowerMockito.mockStatic(Watch.class);
+ when(Watch.createWatch(any(), any(), any())).thenReturn(mockWatch);
+ Call mockCall = mock(Call.class);
+ PowerMockito.whenNew(Call.class).withArguments(any(OkHttpClient.class), any(Request.class)).thenReturn(mockCall);
+
+ namespacedPodListWatch.initOrReset();
+
+ ArgumentCaptor<ApiClient> apiClientArgumentCaptor = ArgumentCaptor.forClass(ApiClient.class);
+ ArgumentCaptor<Call> callArgumentCaptor = ArgumentCaptor.forClass(Call.class);
+ ArgumentCaptor<Type> typeArgumentCaptor = ArgumentCaptor.forClass(Type.class);
+
+ PowerMockito.verifyStatic();
+ Watch.createWatch(
+ apiClientArgumentCaptor.capture(),
+ callArgumentCaptor.capture(),
+ typeArgumentCaptor.capture());
+
+ ApiClient apiClient = apiClientArgumentCaptor.getValue();
+ Call call = callArgumentCaptor.getValue();
+ Type type = typeArgumentCaptor.getValue();
+
+ assertEquals(mockCall, call);
+ assertNotNull(apiClient);
+ assertNotNull(type);
+
+ }
+
+ @Test
+ public void iterator() {
+ when(mockWatch.hasNext()).thenReturn(true, true, false);
+ Iterator mockIterator = mockIterator();
+ when(mockWatch.iterator()).thenReturn(mockIterator);
+ Iterator<Event> iterator = namespacedPodListWatch.iterator();
+
+ assertNotNull(iterator);
+ assertTrue(iterator.hasNext());
+
+ Event event0 = iterator.next();
+ assertNotNull(event0);
+ validateEvent(event0, 0);
+
+ assertTrue(iterator.hasNext());
+ Event event1 = iterator.next();
+ assertNotNull(event1);
+ validateEvent(event1, 1);
+
+ assertFalse(iterator.hasNext());
+
+ }
+
+ @Test
+ public void iteratorWithEmpty() {
+ Iterator iterator = mock(Iterator.class);
+ when(iterator.hasNext()).thenReturn(false);
+ when(mockWatch.iterator()).thenReturn(iterator);
+
+ Iterator<Event> eventIterator = namespacedPodListWatch.iterator();
+ assertFalse(eventIterator.hasNext());
+ }
+
+
+ private Iterator<Watch.Response<V1Pod>> mockIterator() {
+ Iterator<Watch.Response<V1Pod>> iterator = mock(Iterator.class);
+
+ when(iterator.hasNext()).thenReturn(true, true, false);
+ Watch.Response response0 = mockResponse(0);
+ Watch.Response response1 = mockResponse(1);
+
+ when(iterator.next()).thenReturn(response0, response1);
+
+ return iterator;
+ }
+
+ private Watch.Response<V1Pod> mockResponse(int i) {
+ V1Pod v1Pod = new V1Pod();
+ V1ObjectMeta meta = new V1ObjectMeta();
+ V1PodStatus status = new V1PodStatus();
+ status.setPodIP("PodIp" + i);
+ meta.setUid("uid" + i);
+ v1Pod.setMetadata(meta);
+ v1Pod.setStatus(status);
+ Watch.Response response = mock(Watch.Response.class);
+ response.object = v1Pod;
+ response.type = RESPONSE_TYPE;
+ return response;
+ }
+
+ private void validateEvent(Event event, int i) {
+ String type = Whitebox.getInternalState(event, "type");
+ assertEquals(RESPONSE_TYPE, type);
+
+ String uid = Whitebox.getInternalState(event, "uid");
+ assertEquals("uid" + i, uid);
+
+ String host = Whitebox.getInternalState(event, "host");
+ assertEquals("PodIp" + i, host);
+ }
+
+}
\ No newline at end of file
diff --git a/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/NodeNameBuilderTest.java b/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/NodeNameBuilderTest.java
new file mode 100644
index 0000000..6d7d185
--- /dev/null
+++ b/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/NodeNameBuilderTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.skywalking.oap.server.cluster.plugin.zookeeper;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+/**
+ * Created by dengming, 2019.05.02
+ */
+public class NodeNameBuilderTest {
+
+ @Test
+ public void build() {
+ String moduleName = "my-module";
+ String providerName = "my-provider-name";
+ String nodeName = NodeNameBuilder.build(moduleName, providerName);
+ assertEquals(moduleName + "/" + providerName, nodeName);
+ }
+}
\ No newline at end of file
diff --git a/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ZookeeperCoordinatorTest.java b/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ZookeeperCoordinatorTest.java
new file mode 100644
index 0000000..cc7e58a
--- /dev/null
+++ b/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ZookeeperCoordinatorTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.skywalking.oap.server.cluster.plugin.zookeeper;
+
+import com.google.common.base.Strings;
+import org.apache.curator.x.discovery.ServiceCache;
+import org.apache.curator.x.discovery.ServiceCacheBuilder;
+import org.apache.curator.x.discovery.ServiceDiscovery;
+import org.apache.curator.x.discovery.ServiceInstance;
+import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
+import org.apache.skywalking.oap.server.core.remote.client.Address;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+/**
+ * Created by dengming, 2019.05.02
+ */
+public class ZookeeperCoordinatorTest {
+
+ private ClusterModuleZookeeperConfig config = new ClusterModuleZookeeperConfig();
+
+ private ServiceDiscovery<RemoteInstance> serviceDiscovery = mock(ServiceDiscovery.class);
+
+ private ServiceCacheBuilder cacheBuilder = mock(ServiceCacheBuilder.class);
+
+ private ServiceCache serviceCache = mock(ServiceCache.class);
+
+ private ZookeeperCoordinator coordinator;
+
+ private Address address = new Address("127.0.0.2", 10001, false);
+
+ private Address selfAddress = new Address("127.0.0.1", 1000, true);
+
+ @Before
+ public void setUp() throws Exception {
+ config.setHostPort(address.getHost() + ":" + address.getPort());
+ coordinator = new ZookeeperCoordinator(config, serviceDiscovery);
+ when(serviceDiscovery.serviceCacheBuilder()).thenReturn(cacheBuilder);
+ when(cacheBuilder.name("remote")).thenReturn(cacheBuilder);
+ when(cacheBuilder.build()).thenReturn(serviceCache);
+ doNothing().when(serviceCache).start();
+
+ doNothing().when(serviceDiscovery).registerService(any());
+ }
+
+ @Test
+ public void registerRemote() throws Exception {
+ config.setInternalComHost(selfAddress.getHost());
+ config.setInternalComPort(selfAddress.getPort());
+ RemoteInstance instance = new RemoteInstance(address);
+ coordinator.registerRemote(instance);
+ validateServiceInstance(selfAddress, new RemoteInstance(selfAddress));
+ }
+
+ @Test
+ public void registerRemoteNoNeedInternal() throws Exception {
+ RemoteInstance instance = new RemoteInstance(address);
+ coordinator.registerRemote(instance);
+ validateServiceInstance(address, instance);
+ }
+
+ @SuppressWarnings("unchecked")
+ private void validateServiceInstance(Address address, RemoteInstance instance) throws Exception {
+ ArgumentCaptor<ServiceInstance> argumentCaptor = ArgumentCaptor.forClass(ServiceInstance.class);
+ verify(serviceDiscovery).registerService(argumentCaptor.capture());
+
+ ServiceInstance<RemoteInstance> serviceInstance = argumentCaptor.getValue();
+
+ assertEquals("remote", serviceInstance.getName());
+ assertTrue(!Strings.isNullOrEmpty(serviceInstance.getId()));
+ assertEquals(address.getHost(), serviceInstance.getAddress());
+ assertEquals(address.getPort(), serviceInstance.getPort().intValue());
+
+ RemoteInstance payload = serviceInstance.getPayload();
+ assertEquals(payload.getAddress(), instance.getAddress());
+
+ }
+
+ @Test
+ public void queryRemoteNodes() {
+ }
+}
\ No newline at end of file