You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2019/06/17 03:27:45 UTC
[skywalking] branch master updated: Add cluster-etcd-plugin (#2725)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new a70a2b2 Add cluster-etcd-plugin (#2725)
a70a2b2 is described below
commit a70a2b236ef8d33559b263ed0451dc2ff8dbfbf6
Author: Alan Lau <li...@cmss.chinamobile.com>
AuthorDate: Mon Jun 17 11:27:40 2019 +0800
Add cluster-etcd-plugin (#2725)
* Add cluster-etcd-plugin
---
apm-dist/release-docs/LICENSE | 4 +
.../release-docs/licenses/LICENSE-minimal-json.txt | 19 ++
docker/oap/docker-entrypoint.sh | 15 +-
oap-server/pom.xml | 7 +
.../cluster-etcd-plugin/pom.xml | 180 +++++++++++++++++++
.../plugin/etcd/ClusterModuleEtcdConfig.java | 35 ++++
.../plugin/etcd/ClusterModuleEtcdProvider.java | 84 +++++++++
.../cluster/plugin/etcd/EtcdCoordinator.java | 122 +++++++++++++
.../server/cluster/plugin/etcd/EtcdEndpoint.java | 70 ++++++++
.../oap/server/cluster/plugin/etcd/EtcdUtils.java | 50 ++++++
...alking.oap.server.library.module.ModuleProvider | 19 ++
.../plugin/etcd/ClusterModuleEtcdProviderTest.java | 122 +++++++++++++
.../cluster/plugin/etcd/EtcdCoordinatorTest.java | 194 +++++++++++++++++++++
.../plugin/etcd/ITClusterEtcdPluginTest.java | 148 ++++++++++++++++
.../src/test/resources/log4j2.xml} | 32 ++--
oap-server/server-cluster-plugin/pom.xml | 1 +
.../configuration-etcd/pom.xml | 107 ++++++++++++
...alking.oap.server.library.module.ModuleProvider | 19 ++
.../src/test/resources/log4j2.xml} | 32 ++--
oap-server/server-configuration/pom.xml | 1 +
oap-server/server-starter/pom.xml | 5 +
.../src/main/assembly/application.yml | 4 +
.../src/main/resources/application.yml | 4 +
23 files changed, 1232 insertions(+), 42 deletions(-)
diff --git a/apm-dist/release-docs/LICENSE b/apm-dist/release-docs/LICENSE
index fb68dff..3b04ce6 100644
--- a/apm-dist/release-docs/LICENSE
+++ b/apm-dist/release-docs/LICENSE
@@ -318,8 +318,11 @@ The text of each license is the standard Apache 2.0 license.
proto files from prometheus/client_model: https://github.com/prometheus/client_model Apache 2.0
proto files from lyft/protoc-gen-validate: https://github.com/lyft/protoc-gen-validate Apache 2.0
proto files from gogo/googleapis: https://github.com/gogo/googleapis Apache 2.0
+ json-flatter 0.6.0: https://github.com/wnameless/json-flattener Apache 2.0
+ Apache: commons-text 1.4: https://github.com/apache/commons-text Apache 2.0
sundrio 0.9.2: https://github.com/sundrio/sundrio Apache 2.0
Ctripcorp: apollo 1.4.0: https://github.com/ctripcorp/apollo Apache 2.0
+ etcd4j 2.17.0: https://github.com/jurmous/etcd4j Apache 2.0
========================================================================
MIT licenses
@@ -335,6 +338,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
jopt-simple 5.0.2: https://github.com/jopt-simple/jopt-simple , MIT
bcpkix-jdk15on 1.55: http://www.bouncycastle.org/licence.html , MIT
bcprov-jdk15on 1.55: http://www.bouncycastle.org/licence.html , MIT
+ minimao-json 0.9.5: https://github.com/ralfstx/minimal-json, MIT
========================================================================
BSD licenses
diff --git a/apm-dist/release-docs/licenses/LICENSE-minimal-json.txt b/apm-dist/release-docs/licenses/LICENSE-minimal-json.txt
new file mode 100644
index 0000000..5e83ac1
--- /dev/null
+++ b/apm-dist/release-docs/licenses/LICENSE-minimal-json.txt
@@ -0,0 +1,19 @@
+Copyright (c) 2013, 2014 EclipseSource
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
\ No newline at end of file
diff --git a/docker/oap/docker-entrypoint.sh b/docker/oap/docker-entrypoint.sh
index 7ed09d6..88e3d85 100755
--- a/docker/oap/docker-entrypoint.sh
+++ b/docker/oap/docker-entrypoint.sh
@@ -49,7 +49,7 @@ EOT
}
generateClusterConsul() {
- cat <<EOT >> ${var_application_file}
+ cat <<EOT >> ${var_application_file}
cluster:
consul:
serviceName: \${SW_SERVICE_NAME:"SkyWalking_OAP_Cluster"}
@@ -58,6 +58,16 @@ cluster:
EOT
}
+generateClusterEtcd() {
+ cat <<EOT >> ${var_application_file}
+cluster:
+ etcd:
+ serviceName: \${SW_SERVICE_NAME:"SkyWalking_OAP_Cluster"}
+ # Etcd cluster nodes, example: 10.0.0.1:2379,10.0.0.2:2379,10.0.0.3:2379
+ hostPort: \${SW_CLUSTER_ETCD_HOST_PORT:localhost:2379}
+EOT
+}
+
generateStorageElastisearch() {
cat <<EOT >> ${var_application_file}
storage:
@@ -120,7 +130,7 @@ generateApplicationYaml() {
# validate
[[ -z "$SW_CLUSTER" ]] && [[ -z "$SW_STORAGE" ]] && { echo "Error: please specify \"SW_CLUSTER\" \"SW_STORAGE\""; exit 1; }
- validateVariables "SW_CLUSTER" "$SW_CLUSTER" "standalone zookeeper kubernetes consul"
+ validateVariables "SW_CLUSTER" "$SW_CLUSTER" "standalone zookeeper kubernetes consul etcd"
validateVariables "SW_STORAGE" "$SW_STORAGE" "elasticsearch h2 mysql"
@@ -131,6 +141,7 @@ generateApplicationYaml() {
zookeeper) generateClusterZookeeper;;
kubernetes) generateClusterK8s;;
consul) generateClusterConsul;;
+ etcd) generateClusterEtcd;;
esac
#generate core
diff --git a/oap-server/pom.xml b/oap-server/pom.xml
index 9c8dfad..47afc3a 100644
--- a/oap-server/pom.xml
+++ b/oap-server/pom.xml
@@ -79,6 +79,8 @@
<apollo.version>1.4.0</apollo.version>
<maven-docker-plugin.version>0.30.0</maven-docker-plugin.version>
<nacos.version>1.0.0</nacos.version>
+ <etcd4j.version>2.17.0</etcd4j.version>
+ <etcd.version>v3.2.3</etcd.version>
</properties>
<dependencies>
@@ -371,6 +373,11 @@
<artifactId>nacos-client</artifactId>
<version>${nacos.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.mousio</groupId>
+ <artifactId>etcd4j</artifactId>
+ <version>${etcd4j.version}</version>
+ </dependency>
</dependencies>
</dependencyManagement>
</project>
diff --git a/oap-server/server-cluster-plugin/cluster-etcd-plugin/pom.xml b/oap-server/server-cluster-plugin/cluster-etcd-plugin/pom.xml
new file mode 100644
index 0000000..ad576af
--- /dev/null
+++ b/oap-server/server-cluster-plugin/cluster-etcd-plugin/pom.xml
@@ -0,0 +1,180 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ ~
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>server-cluster-plugin</artifactId>
+ <groupId>org.apache.skywalking</groupId>
+ <version>6.2.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>cluster-etcd-plugin</artifactId>
+
+ <name>cluster-etcd-plugin</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.skywalking</groupId>
+ <artifactId>server-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-handler</artifactId>
+ <version>4.1.27.Final</version>
+ </dependency>
+
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-resolver-dns</artifactId>
+ <version>4.1.27.Final</version>
+ </dependency>
+
+
+ <dependency>
+ <groupId>org.mousio</groupId>
+ <artifactId>etcd4j</artifactId>
+ <exclusions>
+ <exclusion>
+ <artifactId>netty-codec-dns</artifactId>
+ <groupId>io.netty</groupId>
+ </exclusion>
+
+ <exclusion>
+ <artifactId>netty-codec-dns</artifactId>
+ <groupId>io.netty</groupId>
+ </exclusion>
+
+ <exclusion>
+ <artifactId>netty-codec-http</artifactId>
+ <groupId>io.netty</groupId>
+ </exclusion>
+
+ <exclusion>
+ <artifactId>netty-handler</artifactId>
+ <groupId>io.netty</groupId>
+ </exclusion>
+
+ <exclusion>
+ <artifactId>netty-resolver-dns</artifactId>
+ <groupId>io.netty</groupId>
+ </exclusion>
+
+ <exclusion>
+ <groupId>com.fasterxml.jackson.module</groupId>
+ <artifactId>jackson-module-afterburner</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.module</groupId>
+ <artifactId>jackson-module-afterburner</artifactId>
+ <version>2.9.5</version>
+ </dependency>
+
+ </dependencies>
+
+ <profiles>
+ <profile>
+ <id>CI-with-IT</id>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>io.fabric8</groupId>
+ <artifactId>docker-maven-plugin</artifactId>
+ <configuration>
+ <sourceMode>all</sourceMode>
+ <logDate>default</logDate>
+ <verbose>true</verbose>
+ <imagePullPolicy>IfNotPresent</imagePullPolicy>
+ </configuration>
+ <executions>
+ <execution>
+ <id>start</id>
+ <phase>pre-integration-test</phase>
+ <goals>
+ <goal>start</goal>
+ </goals>
+ <configuration>
+ <images>
+ <image>
+ <name>quayio/coreos-etcd:${etcd.version}</name>
+ <alias>etcd-client-integration-test</alias>
+ <run>
+ <ports>
+ <port>+etcd.host:etcd.port:2379</port>
+ </ports>
+ <wait>
+ <time>20000</time>
+ </wait>
+ <entrypoint>
+ <!-- exec form -->
+ <exec>
+ <arg>/usr/local/bin/etcd</arg>
+ <arg>--advertise-client-urls=http://0.0.0.0:2379</arg>
+ <arg>--listen-client-urls=http://0.0.0.0:2379</arg>
+ </exec>
+ </entrypoint>
+ </run>
+ </image>
+ </images>
+ </configuration>
+ </execution>
+ <execution>
+ <id>remove-it-etcd</id>
+ <phase>post-integration-test</phase>
+ <goals>
+ <goal>stop</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <configuration>
+ <systemPropertyVariables>
+ <etcd.host>
+ ${etcd.host}
+ </etcd.host>
+ <etcd.port>
+ ${etcd.port}
+ </etcd.port>
+ </systemPropertyVariables>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>integration-test</goal>
+ <goal>verify</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+</project>
diff --git a/oap-server/server-cluster-plugin/cluster-etcd-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/etcd/ClusterModuleEtcdConfig.java b/oap-server/server-cluster-plugin/cluster-etcd-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/etcd/ClusterModuleEtcdConfig.java
new file mode 100644
index 0000000..7e593fc
--- /dev/null
+++ b/oap-server/server-cluster-plugin/cluster-etcd-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/etcd/ClusterModuleEtcdConfig.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.cluster.plugin.etcd;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.skywalking.oap.server.library.module.ModuleConfig;
+
+/**
+ * @author Alan Lau
+ */
+public class ClusterModuleEtcdConfig extends ModuleConfig {
+
+ @Setter @Getter private String serviceName;
+ @Setter @Getter private String hostPort;
+ @Setter @Getter private boolean isSSL;
+ @Setter @Getter private String internalComHost;
+ @Setter @Getter private int internalComPort = -1;
+}
diff --git a/oap-server/server-cluster-plugin/cluster-etcd-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/etcd/ClusterModuleEtcdProvider.java b/oap-server/server-cluster-plugin/cluster-etcd-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/etcd/ClusterModuleEtcdProvider.java
new file mode 100644
index 0000000..a900291
--- /dev/null
+++ b/oap-server/server-cluster-plugin/cluster-etcd-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/etcd/ClusterModuleEtcdProvider.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.cluster.plugin.etcd;
+
+import java.net.URI;
+import java.util.List;
+import mousio.etcd4j.EtcdClient;
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
+import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
+import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
+import org.apache.skywalking.oap.server.library.module.ModuleConfig;
+import org.apache.skywalking.oap.server.library.module.ModuleDefine;
+import org.apache.skywalking.oap.server.library.module.ModuleProvider;
+import org.apache.skywalking.oap.server.library.module.ModuleStartException;
+import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
+
+/**
+ * etcd Provider.
+ *
+ * @author Alan Lau
+ */
+public class ClusterModuleEtcdProvider extends ModuleProvider {
+
+ private final ClusterModuleEtcdConfig config;
+
+ private EtcdClient client;
+
+ public ClusterModuleEtcdProvider() {
+ super();
+ this.config = new ClusterModuleEtcdConfig();
+ }
+
+ @Override public String name() {
+ return "etcd";
+ }
+
+ @Override public Class<? extends ModuleDefine> module() {
+ return ClusterModule.class;
+ }
+
+ @Override public ModuleConfig createConfigBeanIfAbsent() {
+ return config;
+ }
+
+ @Override public void prepare() throws ServiceNotProvidedException, ModuleStartException {
+
+ List<URI> uris = EtcdUtils.parse(config);
+
+ //TODO check isSSL
+ client = new EtcdClient(uris.toArray(new URI[] {}));
+ EtcdCoordinator coordinator = new EtcdCoordinator(config, client);
+ this.registerServiceImplementation(ClusterRegister.class, coordinator);
+ this.registerServiceImplementation(ClusterNodesQuery.class, coordinator);
+ }
+
+ @Override public void start() throws ServiceNotProvidedException {
+
+ }
+
+ @Override public void notifyAfterCompleted() throws ServiceNotProvidedException {
+
+ }
+
+ @Override public String[] requiredModules() {
+ return new String[] {CoreModule.NAME};
+ }
+}
diff --git a/oap-server/server-cluster-plugin/cluster-etcd-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/etcd/EtcdCoordinator.java b/oap-server/server-cluster-plugin/cluster-etcd-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/etcd/EtcdCoordinator.java
new file mode 100644
index 0000000..8a7df69
--- /dev/null
+++ b/oap-server/server-cluster-plugin/cluster-etcd-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/etcd/EtcdCoordinator.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.cluster.plugin.etcd;
+
+import com.google.common.base.Strings;
+import com.google.gson.Gson;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import mousio.etcd4j.EtcdClient;
+import mousio.etcd4j.promises.EtcdResponsePromise;
+import mousio.etcd4j.responses.EtcdKeysResponse;
+import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
+import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
+import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
+import org.apache.skywalking.oap.server.core.cluster.ServiceRegisterException;
+import org.apache.skywalking.oap.server.core.remote.client.Address;
+
+/**
+ * @author Alan Lau
+ */
+public class EtcdCoordinator implements ClusterRegister, ClusterNodesQuery {
+
+ private ClusterModuleEtcdConfig config;
+
+ private EtcdClient client;
+
+ private volatile Address selfAddress;
+
+ private final String serviceName;
+
+ private ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
+
+ private static final Integer KEY_TTL = 45;
+
+ public EtcdCoordinator(ClusterModuleEtcdConfig config, EtcdClient client) {
+ this.config = config;
+ this.client = client;
+ this.serviceName = config.getServiceName();
+ }
+
+ @Override public List<RemoteInstance> queryRemoteNodes() {
+
+ List<RemoteInstance> res = new ArrayList<>();
+ try {
+ EtcdKeysResponse response = client.get(serviceName + "/").send().get();
+ List<EtcdKeysResponse.EtcdNode> nodes = response.getNode().getNodes();
+
+ Gson gson = new Gson();
+ if (nodes != null) {
+ nodes.forEach(node -> {
+ EtcdEndpoint endpoint = gson.fromJson(node.getValue(), EtcdEndpoint.class);
+ res.add(new RemoteInstance(new Address(endpoint.getHost(), endpoint.getPort(), true)));
+ });
+ }
+
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ return res;
+ }
+
+ @Override public void registerRemote(RemoteInstance remoteInstance) throws ServiceRegisterException {
+
+ if (needUsingInternalAddr()) {
+ remoteInstance = new RemoteInstance(new Address(config.getInternalComHost(), config.getInternalComPort(), true));
+ }
+
+ this.selfAddress = remoteInstance.getAddress();
+
+ EtcdEndpoint endpoint = new EtcdEndpoint.Builder().serviceName(serviceName).host(selfAddress.getHost()).port(selfAddress.getPort()).build();
+ try {
+ client.putDir(serviceName).send();
+ String key = buildKey(serviceName, selfAddress, remoteInstance);
+ String json = new Gson().toJson(endpoint);
+ EtcdResponsePromise<EtcdKeysResponse> promise = client.put(key, json).ttl(KEY_TTL).send();
+ //check register.
+ promise.get();
+ renew(client, key, json);
+ } catch (Exception e) {
+ throw new ServiceRegisterException(e.getMessage());
+ }
+
+ }
+
+ private void renew(EtcdClient client, String key, String json) {
+ service.scheduleAtFixedRate(() -> {
+ try {
+ client.refresh(key, KEY_TTL).send().get();
+ } catch (Exception e) {
+
+ }
+ }, 5 * 1000, 30 * 1000, TimeUnit.MILLISECONDS);
+ }
+
+ private String buildKey(String serviceName, Address address, RemoteInstance instance) {
+ return new StringBuilder(serviceName).append("/").append(address.getHost()).append("_").append(instance.hashCode()).toString();
+ }
+
+ private boolean needUsingInternalAddr() {
+ return !Strings.isNullOrEmpty(config.getInternalComHost()) && config.getInternalComPort() > 0;
+ }
+}
diff --git a/oap-server/server-cluster-plugin/cluster-etcd-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/etcd/EtcdEndpoint.java b/oap-server/server-cluster-plugin/cluster-etcd-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/etcd/EtcdEndpoint.java
new file mode 100644
index 0000000..a0f51ea
--- /dev/null
+++ b/oap-server/server-cluster-plugin/cluster-etcd-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/etcd/EtcdEndpoint.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.cluster.plugin.etcd;
+
+import java.io.Serializable;
+import lombok.Getter;
+import lombok.Setter;
+
+/**
+ * an instance json to register to etcd.
+ *
+ * @author Alan Lau
+ */
+public class EtcdEndpoint implements Serializable {
+
+ @Setter @Getter private String serviceName;
+
+ @Setter @Getter private String host;
+
+ @Setter @Getter private int port;
+
+ public EtcdEndpoint(Builder builder) {
+ setServiceName(builder.serviceName);
+ setHost(builder.host);
+ setPort(builder.port);
+ }
+
+ public static class Builder {
+ private String serviceName;
+
+ private String host;
+
+ private int port;
+
+ public Builder serviceName(String serviceName) {
+ this.serviceName = serviceName;
+ return this;
+ }
+
+ public Builder host(String host) {
+ this.host = host;
+ return this;
+ }
+
+ public Builder port(int port) {
+ this.port = port;
+ return this;
+ }
+
+ public EtcdEndpoint build() {
+ return new EtcdEndpoint(this);
+ }
+ }
+}
diff --git a/oap-server/server-cluster-plugin/cluster-etcd-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/etcd/EtcdUtils.java b/oap-server/server-cluster-plugin/cluster-etcd-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/etcd/EtcdUtils.java
new file mode 100644
index 0000000..3cb2edb
--- /dev/null
+++ b/oap-server/server-cluster-plugin/cluster-etcd-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/etcd/EtcdUtils.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.cluster.plugin.etcd;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.skywalking.oap.server.library.module.ModuleStartException;
+import org.apache.skywalking.oap.server.library.util.Address;
+import org.apache.skywalking.oap.server.library.util.ConnectStringParseException;
+import org.apache.skywalking.oap.server.library.util.ConnectUtils;
+
+/**
+ * @author Alan Lau
+ */
+public class EtcdUtils {
+
+ public EtcdUtils() {
+ }
+
+ public static List<URI> parse(ClusterModuleEtcdConfig config) throws ModuleStartException {
+ List<URI> uris = new ArrayList<>();
+ try {
+ List<Address> addressList = ConnectUtils.parse(config.getHostPort());
+ for (Address address : addressList) {
+ uris.add(URI.create(new StringBuilder("http://").append(address.getHost()).append(":").append(address.getPort()).toString()));
+ }
+ } catch (ConnectStringParseException e) {
+ throw new ModuleStartException(e.getMessage(), e);
+ }
+
+ return uris;
+ }
+}
diff --git a/oap-server/server-cluster-plugin/cluster-etcd-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider b/oap-server/server-cluster-plugin/cluster-etcd-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider
new file mode 100644
index 0000000..bf8dafb
--- /dev/null
+++ b/oap-server/server-cluster-plugin/cluster-etcd-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+
+org.apache.skywalking.oap.server.cluster.plugin.etcd.ClusterModuleEtcdProvider
\ No newline at end of file
diff --git a/oap-server/server-cluster-plugin/cluster-etcd-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/etcd/ClusterModuleEtcdProviderTest.java b/oap-server/server-cluster-plugin/cluster-etcd-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/etcd/ClusterModuleEtcdProviderTest.java
new file mode 100644
index 0000000..7cc1d79
--- /dev/null
+++ b/oap-server/server-cluster-plugin/cluster-etcd-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/etcd/ClusterModuleEtcdProviderTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.cluster.plugin.etcd;
+
+import java.net.URI;
+import java.util.List;
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
+import org.apache.skywalking.oap.server.library.module.ModuleConfig;
+import org.apache.skywalking.oap.server.library.module.ModuleStartException;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.reflect.Whitebox;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+/**
+ * @author Alan Lau
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(EtcdUtils.class)
+@PowerMockIgnore("javax.management.*")
+public class ClusterModuleEtcdProviderTest {
+
+ private ClusterModuleEtcdProvider provider = new ClusterModuleEtcdProvider();
+
+ @Test
+ public void name() {
+ assertEquals("etcd", provider.name());
+ }
+
+ @Test
+ public void module() {
+ assertEquals(ClusterModule.class, provider.module());
+ }
+
+ @Test
+ public void createConfigBeanIfAbsent() {
+ ModuleConfig moduleConfig = provider.createConfigBeanIfAbsent();
+ assertTrue(moduleConfig instanceof ClusterModuleEtcdConfig);
+ }
+
+ @Test(expected = ModuleStartException.class)
+ public void prepareWithNonHost() throws Exception {
+ provider.prepare();
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void prepare() throws Exception {
+ PowerMockito.mockStatic(EtcdUtils.class);
+ ClusterModuleEtcdConfig etcdConfig = new ClusterModuleEtcdConfig();
+ etcdConfig.setHostPort("10.0.0.1:1000,10.0.0.2:1001");
+ Whitebox.setInternalState(provider, "config", etcdConfig);
+ provider.prepare();
+
+ List<URI> uris = mock(List.class);
+ PowerMockito.when(EtcdUtils.parse(etcdConfig)).thenReturn(uris);
+ ArgumentCaptor<ClusterModuleEtcdConfig> addressCaptor = ArgumentCaptor.forClass(ClusterModuleEtcdConfig.class);
+ PowerMockito.verifyStatic();
+ EtcdUtils.parse(addressCaptor.capture());
+ ClusterModuleEtcdConfig cfg = addressCaptor.getValue();
+ assertEquals(etcdConfig.getHostPort(), cfg.getHostPort());
+ }
+
+ @Test
+ public void prepareSingle() throws Exception {
+ PowerMockito.mockStatic(EtcdUtils.class);
+ ClusterModuleEtcdConfig etcdConfig = new ClusterModuleEtcdConfig();
+ etcdConfig.setHostPort("10.0.0.1:1000");
+ Whitebox.setInternalState(provider, "config", etcdConfig);
+ provider.prepare();
+
+ List<URI> uris = mock(List.class);
+ PowerMockito.when(EtcdUtils.parse(etcdConfig)).thenReturn(uris);
+ ArgumentCaptor<ClusterModuleEtcdConfig> addressCaptor = ArgumentCaptor.forClass(ClusterModuleEtcdConfig.class);
+ PowerMockito.verifyStatic();
+ EtcdUtils.parse(addressCaptor.capture());
+ ClusterModuleEtcdConfig cfg = addressCaptor.getValue();
+ assertEquals(etcdConfig.getHostPort(), cfg.getHostPort());
+ }
+
+ @Test
+ public void start() {
+ provider.start();
+ }
+
+ @Test
+ public void notifyAfterCompleted() {
+ provider.notifyAfterCompleted();
+ }
+
+ @Test
+ public void requiredModules() {
+ String[] modules = provider.requiredModules();
+ assertArrayEquals(new String[] {CoreModule.NAME}, modules);
+ }
+}
diff --git a/oap-server/server-cluster-plugin/cluster-etcd-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/etcd/EtcdCoordinatorTest.java b/oap-server/server-cluster-plugin/cluster-etcd-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/etcd/EtcdCoordinatorTest.java
new file mode 100644
index 0000000..dd6b4ff
--- /dev/null
+++ b/oap-server/server-cluster-plugin/cluster-etcd-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/etcd/EtcdCoordinatorTest.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.cluster.plugin.etcd;
+
+import com.google.gson.Gson;
+import java.util.ArrayList;
+import java.util.List;
+import mousio.etcd4j.EtcdClient;
+import mousio.etcd4j.promises.EtcdResponsePromise;
+import mousio.etcd4j.requests.EtcdKeyGetRequest;
+import mousio.etcd4j.requests.EtcdKeyPutRequest;
+import mousio.etcd4j.responses.EtcdKeysResponse;
+import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
+import org.apache.skywalking.oap.server.core.remote.client.Address;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * @author Alan Lau
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(EtcdKeysResponse.class)
+@PowerMockIgnore("javax.management.*")
+public class EtcdCoordinatorTest {
+
+ private ClusterModuleEtcdConfig etcdConfig = new ClusterModuleEtcdConfig();
+
+ private EtcdClient client;
+
+ private EtcdCoordinator coordinator;
+
+ private Gson gson = new Gson();
+
+ private Address remoteAddress = new Address("10.0.0.1", 1000, false);
+ private Address selfRemoteAddress = new Address("10.0.0.2", 1001, true);
+
+ private Address internalAddress = new Address("10.0.0.3", 1002, false);
+
+ private static final String SERVICE_NAME = "my-service";
+
+ private EtcdResponsePromise<EtcdKeysResponse> getPromise, putPromise;
+
+ private EtcdKeysResponse response;
+
+ private EtcdKeyPutRequest putRequest = mock(EtcdKeyPutRequest.class);
+
+ private EtcdKeyGetRequest getRequest = mock(EtcdKeyGetRequest.class);
+
+ private EtcdKeyPutRequest putDirRequest = mock(EtcdKeyPutRequest.class);
+
+ private EtcdResponsePromise<EtcdKeysResponse> putDirPromise;
+
+ @Mock
+ private List<EtcdKeysResponse.EtcdNode> list = mock(List.class);
+
+ @Before
+ public void setUp() throws Exception {
+ etcdConfig.setServiceName(SERVICE_NAME);
+
+ client = mock(EtcdClient.class);
+ PowerMockito.whenNew(EtcdClient.class).withAnyArguments().thenReturn(client);
+ client = new EtcdClient("http://10.0.0.1:1000", "http://10.0.0.2:2000");
+ coordinator = new EtcdCoordinator(etcdConfig, client);
+
+ putPromise = (EtcdResponsePromise<EtcdKeysResponse>)mock(EtcdResponsePromise.class);
+ getPromise = (EtcdResponsePromise<EtcdKeysResponse>)mock(EtcdResponsePromise.class);
+ putDirPromise = (EtcdResponsePromise<EtcdKeysResponse>)mock(EtcdResponsePromise.class);
+
+ PowerMockito.when(client.putDir(anyString())).thenReturn(putDirRequest);
+ PowerMockito.when(putDirRequest.ttl(anyInt())).thenReturn(putDirRequest);
+ PowerMockito.when(putDirRequest.send()).thenReturn(putDirPromise);
+ PowerMockito.when(client.put(anyString(), anyString())).thenReturn(putRequest);
+ PowerMockito.when(putRequest.ttl(anyInt())).thenReturn(putRequest);
+ PowerMockito.when(putRequest.send()).thenReturn(putPromise);
+ PowerMockito.when(client.get(anyString())).thenReturn(getRequest);
+ PowerMockito.when(getRequest.send()).thenReturn(getPromise);
+
+ response = PowerMockito.mock(EtcdKeysResponse.class);
+
+ response = PowerMockito.mock(EtcdKeysResponse.class);
+ when(putPromise.get()).thenReturn(response);
+ when(getPromise.get()).thenReturn(response);
+ when(putDirPromise.get()).thenReturn(response);
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void queryRemoteNodesWithNonOrEmpty() {
+ EtcdKeysResponse.EtcdNode node = PowerMockito.mock(EtcdKeysResponse.EtcdNode.class);
+ when(response.getNode()).thenReturn(node);
+ when(node.getValue()).thenReturn("{}");
+ assertEquals(0, coordinator.queryRemoteNodes().size());
+ assertEquals(0, coordinator.queryRemoteNodes().size());
+ }
+
+ @Test
+ public void queryRemoteNodes() {
+ registerSelfRemote();
+
+ EtcdKeysResponse.EtcdNode node = PowerMockito.mock(EtcdKeysResponse.EtcdNode.class);
+ EtcdKeysResponse.EtcdNode node1 = PowerMockito.mock(EtcdKeysResponse.EtcdNode.class);
+
+ when(response.getNode()).thenReturn(node);
+ list = new ArrayList<>();
+ List list1 = Mockito.spy(list);
+ list1.add(node1);
+ when(node.getNodes()).thenReturn(list1);
+ when(node1.getValue()).thenReturn("{\"serviceId\":\"my-service\",\"host\":\"10.0.0.2\",\"port\":1001}");
+ List<RemoteInstance> remoteInstances = coordinator.queryRemoteNodes();
+ assertEquals(1, remoteInstances.size());
+
+ RemoteInstance selfInstance = remoteInstances.get(0);
+ velidate(selfRemoteAddress, selfInstance);
+ }
+
+ @Test
+ public void registerRemote() {
+ registerRemote(remoteAddress);
+ }
+
+ @Test
+ public void registerSelfRemote() {
+ registerRemote(selfRemoteAddress);
+ }
+
+ @Test
+ public void registerRemoteUsingInternal() {
+ etcdConfig.setInternalComHost(internalAddress.getHost());
+ etcdConfig.setInternalComPort(internalAddress.getPort());
+ registerRemote(internalAddress);
+ }
+
+ private void velidate(Address originArress, RemoteInstance instance) {
+ Address instanceAddress = instance.getAddress();
+ assertEquals(originArress.getHost(), instanceAddress.getHost());
+ assertEquals(originArress.getPort(), instanceAddress.getPort());
+ }
+
+ private void registerRemote(Address address) {
+ coordinator.registerRemote(new RemoteInstance(address));
+ EtcdEndpoint endpoint = afterRegister().get(0);
+ verifyRegistration(address, endpoint);
+ }
+
+ private List<EtcdEndpoint> afterRegister() {
+ ArgumentCaptor<String> argumentCaptor = ArgumentCaptor.forClass(String.class);
+ ArgumentCaptor<String> nameCaptor = ArgumentCaptor.forClass(String.class);
+ verify(client).put(nameCaptor.capture(), argumentCaptor.capture());
+ EtcdEndpoint endpoint = gson.fromJson(argumentCaptor.getValue(), EtcdEndpoint.class);
+ List<EtcdEndpoint> list = new ArrayList<>();
+ list.add(endpoint);
+ return list;
+ }
+
+ private void verifyRegistration(Address remoteAddress, EtcdEndpoint endpoint) {
+ assertNotNull(endpoint);
+ assertEquals(SERVICE_NAME, endpoint.getServiceName());
+ assertEquals(remoteAddress.getHost(), endpoint.getHost());
+ assertEquals(remoteAddress.getPort(), endpoint.getPort());
+ }
+
+}
diff --git a/oap-server/server-cluster-plugin/cluster-etcd-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/etcd/ITClusterEtcdPluginTest.java b/oap-server/server-cluster-plugin/cluster-etcd-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/etcd/ITClusterEtcdPluginTest.java
new file mode 100644
index 0000000..c0fa3f8
--- /dev/null
+++ b/oap-server/server-cluster-plugin/cluster-etcd-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/etcd/ITClusterEtcdPluginTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.cluster.plugin.etcd;
+
+import com.google.gson.Gson;
+import java.net.URI;
+import java.util.List;
+import mousio.etcd4j.EtcdClient;
+import mousio.etcd4j.responses.EtcdKeysResponse;
+import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
+import org.apache.skywalking.oap.server.core.remote.client.Address;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * @author Alan Lau
+ */
+public class ITClusterEtcdPluginTest {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(ITClusterEtcdPluginTest.class);
+
+ private ClusterModuleEtcdConfig etcdConfig;
+
+ private EtcdClient client;
+
+ private EtcdCoordinator coordinator;
+
+ private Gson gson = new Gson();
+
+ private Address remoteAddress = new Address("10.0.0.1", 1000, false);
+ private Address selfRemoteAddress = new Address("10.0.0.2", 1001, true);
+
+ private Address internalAddress = new Address("10.0.0.3", 1002, false);
+
+ private static final String SERVICE_NAME = "my-service";
+
+ @Before
+ public void before() throws Exception {
+ String etcdHost = System.getProperty("etcd.host");
+ String port = System.getProperty("etcd.port");
+ String baseUrl = "http://" + etcdHost + ":" + port;
+ LOGGER.info("etcd baseURL: {}", baseUrl);
+ etcdConfig = new ClusterModuleEtcdConfig();
+ etcdConfig.setServiceName(SERVICE_NAME);
+ client = new EtcdClient(URI.create(baseUrl));
+ coordinator = new EtcdCoordinator(etcdConfig, client);
+ }
+
+ @After
+ public void after() throws Exception {
+ client.close();
+ }
+
+ @Test
+ public void registerRemote() throws Throwable {
+ registerRemote(remoteAddress);
+ clear();
+ }
+
+ @Test
+ public void registerSelfRemote() throws Throwable {
+ registerRemote(selfRemoteAddress);
+ clear();
+ }
+
+ @Test
+ public void registerRemoteUsingInternal() throws Throwable {
+ etcdConfig.setInternalComHost(internalAddress.getHost());
+ etcdConfig.setInternalComPort(internalAddress.getPort());
+ etcdConfig.setServiceName(SERVICE_NAME);
+ registerRemote(internalAddress);
+ clear();
+ }
+
+ @Test
+ public void queryRemoteNodes() throws Throwable {
+ registerRemote(selfRemoteAddress);
+ List<RemoteInstance> remoteInstances = coordinator.queryRemoteNodes();
+ assertEquals(1, remoteInstances.size());
+
+ RemoteInstance selfInstance = remoteInstances.get(0);
+ velidate(selfRemoteAddress, selfInstance);
+ clear();
+ }
+
+ private void velidate(Address originArress, RemoteInstance instance) {
+ Address instanceAddress = instance.getAddress();
+ assertEquals(originArress.getHost(), instanceAddress.getHost());
+ assertEquals(originArress.getPort(), instanceAddress.getPort());
+ }
+
+ private void registerRemote(Address address) throws Throwable {
+ coordinator.registerRemote(new RemoteInstance(address));
+ EtcdEndpoint endpoint = afterRegister();
+ verifyRegistration(address, endpoint);
+ }
+
+ private EtcdEndpoint afterRegister() throws Throwable {
+ List<RemoteInstance> list = coordinator.queryRemoteNodes();
+ assertEquals(list.size(), 1L);
+ return buildEndpoint(list.get(0));
+ }
+
+ private void clear() throws Throwable {
+ EtcdKeysResponse response = client.get(SERVICE_NAME + "/").send().get();
+ List<EtcdKeysResponse.EtcdNode> nodes = response.getNode().getNodes();
+
+ for (EtcdKeysResponse.EtcdNode node : nodes) {
+ client.delete(node.getKey()).send().get();
+ }
+ }
+
+ private void verifyRegistration(Address remoteAddress, EtcdEndpoint endpoint) {
+ assertNotNull(endpoint);
+ assertEquals(SERVICE_NAME, endpoint.getServiceName());
+ assertEquals(remoteAddress.getHost(), endpoint.getHost());
+ assertEquals(remoteAddress.getPort(), endpoint.getPort());
+ }
+
+ private EtcdEndpoint buildEndpoint(RemoteInstance instance) {
+ Address address = instance.getAddress();
+ EtcdEndpoint endpoint = new EtcdEndpoint.Builder().host(address.getHost()).port(address.getPort()).serviceName(SERVICE_NAME).build();
+ return endpoint;
+ }
+
+}
diff --git a/oap-server/server-configuration/pom.xml b/oap-server/server-cluster-plugin/cluster-etcd-plugin/src/test/resources/log4j2.xml
similarity index 54%
copy from oap-server/server-configuration/pom.xml
copy to oap-server/server-cluster-plugin/cluster-etcd-plugin/src/test/resources/log4j2.xml
index 079ea1d..c9eec4f 100644
--- a/oap-server/server-configuration/pom.xml
+++ b/oap-server/server-cluster-plugin/cluster-etcd-plugin/src/test/resources/log4j2.xml
@@ -17,23 +17,15 @@
~
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <artifactId>oap-server</artifactId>
- <groupId>org.apache.skywalking</groupId>
- <version>6.2.0-SNAPSHOT</version>
- </parent>
- <modelVersion>4.0.0</modelVersion>
-
- <artifactId>server-configuration</artifactId>
- <packaging>pom</packaging>
- <modules>
- <module>configuration-api</module>
- <module>grpc-configuration-sync</module>
- <module>configuration-apollo</module>
- <module>configuration-nacos</module>
- </modules>
-
-</project>
+<Configuration status="info">
+ <Appenders>
+ <Console name="Console" target="SYSTEM_OUT">
+ <PatternLayout charset="UTF-8" pattern="%d - %c -%-4r [%t] %-5p %x - %m%n"/>
+ </Console>
+ </Appenders>
+ <Loggers>
+ <Root level="info">
+ <AppenderRef ref="Console"/>
+ </Root>
+ </Loggers>
+</Configuration>
diff --git a/oap-server/server-cluster-plugin/pom.xml b/oap-server/server-cluster-plugin/pom.xml
index 2cd0a94..fa41273 100644
--- a/oap-server/server-cluster-plugin/pom.xml
+++ b/oap-server/server-cluster-plugin/pom.xml
@@ -33,6 +33,7 @@
<module>cluster-kubernetes-plugin</module>
<module>cluster-consul-plugin</module>
<module>cluster-nacos-plugin</module>
+ <module>cluster-etcd-plugin</module>
</modules>
<dependencies>
diff --git a/oap-server/server-configuration/configuration-etcd/pom.xml b/oap-server/server-configuration/configuration-etcd/pom.xml
new file mode 100644
index 0000000..f1b4405
--- /dev/null
+++ b/oap-server/server-configuration/configuration-etcd/pom.xml
@@ -0,0 +1,107 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ ~
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>server-configuration</artifactId>
+ <groupId>org.apache.skywalking</groupId>
+ <version>6.2.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>configuration-etcd</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.skywalking</groupId>
+ <artifactId>configuration-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.skywalking</groupId>
+ <artifactId>library-client</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.skywalking</groupId>
+ <artifactId>cluster-etcd-plugin</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+
+ <profiles>
+ <profile>
+ <id>CI-with-IT</id>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>io.fabric8</groupId>
+ <artifactId>docker-maven-plugin</artifactId>
+ <configuration>
+ <sourceMode>all</sourceMode>
+ <logDate>default</logDate>
+ <verbose>true</verbose>
+ <imagePullPolicy>IfNotPresent</imagePullPolicy>
+ </configuration>
+ <executions>
+ <execution>
+ <id>start</id>
+ <phase>pre-integration-test</phase>
+ <goals>
+ <goal>start</goal>
+ </goals>
+ <configuration>
+ <images>
+ <image>
+ <name>quayio/coreos-etcd:${etcd.version}</name>
+ <alias>etcd-client-integration-test</alias>
+ <run>
+ <ports>
+ <port>2379:2379</port>
+ </ports>
+ <wait>
+ <time>30000</time>
+ </wait>
+ <entrypoint>
+ <!-- exec form -->
+ <exec>
+ <arg>/usr/local/bin/etcd</arg>
+ <arg>--advertise-client-urls=http://0.0.0.0:2379</arg>
+ <arg>--listen-client-urls=http://0.0.0.0:2379</arg>
+ </exec>
+ </entrypoint>
+ </run>
+ </image>
+ </images>
+ </configuration>
+ </execution>
+ <execution>
+ <id>remove-it-etcd</id>
+ <phase>post-integration-test</phase>
+ <goals>
+ <goal>stop</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+</project>
diff --git a/oap-server/server-configuration/configuration-etcd/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider b/oap-server/server-configuration/configuration-etcd/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider
new file mode 100644
index 0000000..bf8dafb
--- /dev/null
+++ b/oap-server/server-configuration/configuration-etcd/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+
+org.apache.skywalking.oap.server.cluster.plugin.etcd.ClusterModuleEtcdProvider
\ No newline at end of file
diff --git a/oap-server/server-configuration/pom.xml b/oap-server/server-configuration/configuration-etcd/src/test/resources/log4j2.xml
similarity index 54%
copy from oap-server/server-configuration/pom.xml
copy to oap-server/server-configuration/configuration-etcd/src/test/resources/log4j2.xml
index 079ea1d..c9eec4f 100644
--- a/oap-server/server-configuration/pom.xml
+++ b/oap-server/server-configuration/configuration-etcd/src/test/resources/log4j2.xml
@@ -17,23 +17,15 @@
~
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <artifactId>oap-server</artifactId>
- <groupId>org.apache.skywalking</groupId>
- <version>6.2.0-SNAPSHOT</version>
- </parent>
- <modelVersion>4.0.0</modelVersion>
-
- <artifactId>server-configuration</artifactId>
- <packaging>pom</packaging>
- <modules>
- <module>configuration-api</module>
- <module>grpc-configuration-sync</module>
- <module>configuration-apollo</module>
- <module>configuration-nacos</module>
- </modules>
-
-</project>
+<Configuration status="info">
+ <Appenders>
+ <Console name="Console" target="SYSTEM_OUT">
+ <PatternLayout charset="UTF-8" pattern="%d - %c -%-4r [%t] %-5p %x - %m%n"/>
+ </Console>
+ </Appenders>
+ <Loggers>
+ <Root level="info">
+ <AppenderRef ref="Console"/>
+ </Root>
+ </Loggers>
+</Configuration>
diff --git a/oap-server/server-configuration/pom.xml b/oap-server/server-configuration/pom.xml
index 079ea1d..69f8f34 100644
--- a/oap-server/server-configuration/pom.xml
+++ b/oap-server/server-configuration/pom.xml
@@ -34,6 +34,7 @@
<module>grpc-configuration-sync</module>
<module>configuration-apollo</module>
<module>configuration-nacos</module>
+ <module>configuration-etcd</module>
</modules>
</project>
diff --git a/oap-server/server-starter/pom.xml b/oap-server/server-starter/pom.xml
index 9660de6..1567c49 100644
--- a/oap-server/server-starter/pom.xml
+++ b/oap-server/server-starter/pom.xml
@@ -75,6 +75,11 @@
<artifactId>cluster-nacos-plugin</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.skywalking</groupId>
+ <artifactId>cluster-etcd-plugin</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<!-- cluster module -->
<!-- receiver module -->
diff --git a/oap-server/server-starter/src/main/assembly/application.yml b/oap-server/server-starter/src/main/assembly/application.yml
index f6b0133..c009351 100644
--- a/oap-server/server-starter/src/main/assembly/application.yml
+++ b/oap-server/server-starter/src/main/assembly/application.yml
@@ -36,6 +36,10 @@ cluster:
# nacos:
# serviceName: ${SW_SERVICE_NAME:"SkyWalking_OAP_Cluster"}
# hostPort: ${SW_CLUSTER_NACOS_HOST_PORT:localhost:8848}
+# etcd:
+# serviceName: ${SW_SERVICE_NAME:"SkyWalking_OAP_Cluster"}
+# etcd cluster nodes, example: 10.0.0.1:2379,10.0.0.2:2379,10.0.0.3:2379
+# hostPort: ${SW_CLUSTER_ETCD_HOST_PORT:localhost:2379}
core:
default:
# Mixed: Receive agent data, Level 1 aggregate, Level 2 aggregate
diff --git a/oap-server/server-starter/src/main/resources/application.yml b/oap-server/server-starter/src/main/resources/application.yml
index ac1293d..cb162e0 100644
--- a/oap-server/server-starter/src/main/resources/application.yml
+++ b/oap-server/server-starter/src/main/resources/application.yml
@@ -36,6 +36,10 @@ cluster:
# nacos:
# serviceName: ${SW_SERVICE_NAME:"SkyWalking_OAP_Cluster"}
# hostPort: ${SW_CLUSTER_NACOS_HOST_PORT:localhost:8848}
+# etcd:
+# serviceName: ${SW_SERVICE_NAME:"SkyWalking_OAP_Cluster"}
+# etcd cluster nodes, example: 10.0.0.1:2379,10.0.0.2:2379,10.0.0.3:2379
+# hostPort: ${SW_CLUSTER_ETCD_HOST_PORT:localhost:2379}
core:
default:
# Mixed: Receive agent data, Level 1 aggregate, Level 2 aggregate