You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by yi...@apache.org on 2019/03/11 03:56:26 UTC
[incubator-dubbo] branch master updated: [Dubbo-808] Support etcd
registry (#3605)
This is an automated email from the ASF dual-hosted git repository.
yiji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-dubbo.git
The following commit(s) were added to refs/heads/master by this push:
new 1ee4d84 [Dubbo-808] Support etcd registry (#3605)
1ee4d84 is described below
commit 1ee4d847252a9aa3458409a25ac72b49b4a36fd1
Author: Huxing Zhang <hu...@gmail.com>
AuthorDate: Mon Mar 11 11:55:58 2019 +0800
[Dubbo-808] Support etcd registry (#3605)
* Merge https://github.com/dubbo/dubbo-registry-etcd into incubator-dubbo
* Add UT to ConfigurationUtilsTest
---
dubbo-bom/pom.xml | 10 +
.../common/config/ConfigurationUtilsTest.java | 27 +
dubbo-dependencies-bom/pom.xml | 16 +
dubbo-registry/dubbo-registry-etcd3/pom.xml | 53 ++
.../apache/dubbo/registry/etcd/EtcdRegistry.java | 373 +++++++++++
.../dubbo/registry/etcd/EtcdRegistryFactory.java | 53 ++
.../org.apache.dubbo.registry.RegistryFactory | 1 +
.../dubbo/registry/etcd/EtcdRegistryTest.java | 316 +++++++++
dubbo-registry/pom.xml | 1 +
dubbo-remoting/dubbo-remoting-etcd3/pom.xml | 52 ++
.../dubbo/remoting/etcd/AbstractRetryPolicy.java | 40 +-
.../apache/dubbo/remoting/etcd/ChildListener.java | 25 +-
.../org/apache/dubbo/remoting/etcd/EtcdClient.java | 167 +++++
.../dubbo/remoting/etcd/EtcdTransporter.java | 47 ++
.../apache/dubbo/remoting/etcd/RetryPolicy.java | 33 +-
.../apache/dubbo/remoting/etcd/StateListener.java | 25 +-
.../dubbo/remoting/etcd/jetcd/JEtcdClient.java | 400 ++++++++++++
.../remoting/etcd/jetcd/JEtcdClientWrapper.java | 706 +++++++++++++++++++++
.../remoting/etcd/jetcd/JEtcdTransporter.java | 28 +-
.../dubbo/remoting/etcd/jetcd/RetryLoops.java | 95 +++
.../dubbo/remoting/etcd/jetcd/RetryNTimes.java | 30 +-
.../dubbo/remoting/etcd/option/Constants.java | 28 +-
.../dubbo/remoting/etcd/option/OptionUtil.java | 76 +++
.../remoting/etcd/support/AbstractEtcdClient.java | 194 ++++++
.../org.apache.dubbo.remoting.etcd.EtcdTransporter | 1 +
.../dubbo/remoting/etcd/jetcd/JEtcdClientTest.java | 260 ++++++++
.../etcd/jetcd/JEtcdClientWrapperTest.java | 186 ++++++
dubbo-remoting/pom.xml | 1 +
28 files changed, 3104 insertions(+), 140 deletions(-)
diff --git a/dubbo-bom/pom.xml b/dubbo-bom/pom.xml
index 73e73df..d04495a 100644
--- a/dubbo-bom/pom.xml
+++ b/dubbo-bom/pom.xml
@@ -140,6 +140,11 @@
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo-remoting-etcd3</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-rpc-api</artifactId>
<version>${project.version}</version>
</dependency>
@@ -220,6 +225,11 @@
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo-registry-etcd3</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-registry-consul</artifactId>
<version>${project.version}</version>
</dependency>
diff --git a/dubbo-common/src/test/java/org/apache/dubbo/common/config/ConfigurationUtilsTest.java b/dubbo-common/src/test/java/org/apache/dubbo/common/config/ConfigurationUtilsTest.java
index 89da34c..a16a374 100644
--- a/dubbo-common/src/test/java/org/apache/dubbo/common/config/ConfigurationUtilsTest.java
+++ b/dubbo-common/src/test/java/org/apache/dubbo/common/config/ConfigurationUtilsTest.java
@@ -21,6 +21,8 @@ import org.apache.dubbo.common.Constants;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import java.util.Map;
+
/**
*
*/
@@ -39,4 +41,29 @@ public class ConfigurationUtilsTest {
Assertions.assertEquals("10000", ConfigurationUtils.getProperty(Constants.SHUTDOWN_WAIT_KEY));
System.clearProperty(Constants.SHUTDOWN_WAIT_KEY);
}
+
+ @Test
+ public void testParseSingleProperties() throws Exception {
+ String p1 = "aaa=bbb";
+ Map<String, String> result = ConfigurationUtils.parseProperties(p1);
+ Assertions.assertEquals(1, result.size());
+ Assertions.assertEquals("bbb", result.get("aaa"));
+ }
+
+ @Test
+ public void testParseMultipleProperties() throws Exception {
+ String p1 = "aaa=bbb\nccc=ddd";
+ Map<String, String> result = ConfigurationUtils.parseProperties(p1);
+ Assertions.assertEquals(2, result.size());
+ Assertions.assertEquals("bbb", result.get("aaa"));
+ Assertions.assertEquals("ddd", result.get("ccc"));
+ }
+
+ @Test
+ public void testEscapedNewLine() throws Exception {
+ String p1 = "dubbo.registry.address=zookeeper://127.0.0.1:2181\\\\ndubbo.protocol.port=20880";
+ Map<String, String> result = ConfigurationUtils.parseProperties(p1);
+ Assertions.assertEquals(1, result.size());
+ Assertions.assertEquals("zookeeper://127.0.0.1:2181\\ndubbo.protocol.port=20880", result.get("dubbo.registry.address"));
+ }
}
diff --git a/dubbo-dependencies-bom/pom.xml b/dubbo-dependencies-bom/pom.xml
index c9fa963..baa2eed 100644
--- a/dubbo-dependencies-bom/pom.xml
+++ b/dubbo-dependencies-bom/pom.xml
@@ -127,6 +127,7 @@
<rs_api_version>2.0</rs_api_version>
<resteasy_version>3.0.19.Final</resteasy_version>
<tomcat_embed_version>8.5.31</tomcat_embed_version>
+ <jetcd_version>0.3.0</jetcd_version>
<!-- Log libs -->
<slf4j_version>1.7.25</slf4j_version>
<jcl_version>1.2</jcl_version>
@@ -379,6 +380,21 @@
<artifactId>tomcat-embed-logging-juli</artifactId>
<version>${tomcat_embed_version}</version>
</dependency>
+ <dependency>
+ <groupId>io.etcd</groupId>
+ <artifactId>jetcd-core</artifactId>
+ <version>${jetcd_version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-codec-http2</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-handler-proxy</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
<!-- Log libs -->
<dependency>
<groupId>org.slf4j</groupId>
diff --git a/dubbo-registry/dubbo-registry-etcd3/pom.xml b/dubbo-registry/dubbo-registry-etcd3/pom.xml
new file mode 100644
index 0000000..00b0ae5
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-etcd3/pom.xml
@@ -0,0 +1,53 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>dubbo-registry</artifactId>
+ <groupId>org.apache.dubbo</groupId>
+ <version>2.7.1-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>dubbo-registry-etcd3</artifactId>
+ <packaging>jar</packaging>
+ <name>${project.artifactId}</name>
+ <description>The etcd3 registry module of Dubbo project</description>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo-registry-api</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo-common</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo-remoting-etcd3</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ </dependencies>
+
+
+</project>
\ No newline at end of file
diff --git a/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdRegistry.java b/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdRegistry.java
new file mode 100644
index 0000000..504d521
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdRegistry.java
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.etcd;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.ConcurrentHashSet;
+import org.apache.dubbo.common.utils.UrlUtils;
+import org.apache.dubbo.registry.NotifyListener;
+import org.apache.dubbo.registry.support.FailbackRegistry;
+import org.apache.dubbo.remoting.etcd.ChildListener;
+import org.apache.dubbo.remoting.etcd.EtcdClient;
+import org.apache.dubbo.remoting.etcd.EtcdTransporter;
+import org.apache.dubbo.remoting.etcd.StateListener;
+import org.apache.dubbo.remoting.etcd.option.Constants;
+import org.apache.dubbo.remoting.etcd.option.OptionUtil;
+import org.apache.dubbo.rpc.RpcException;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+
+/**
+ * Support for ectd3 registry.
+ */
+public class EtcdRegistry extends FailbackRegistry {
+
+ private final static Logger logger = LoggerFactory.getLogger(EtcdRegistry.class);
+
+ private final static int DEFAULT_ETCD_PORT = 2379;
+
+ private final static String DEFAULT_ROOT = "dubbo";
+
+ private final String root;
+
+ private final Set<String> anyServices = new ConcurrentHashSet<String>();
+
+ private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> etcdListeners = new ConcurrentHashMap<URL, ConcurrentMap<NotifyListener, ChildListener>>();
+ private final EtcdClient etcdClient;
+ private long expirePeriod;
+
+ public EtcdRegistry(URL url, EtcdTransporter etcdTransporter) {
+ super(url);
+ if (url.isAnyHost()) {
+ throw new IllegalStateException("registry address is invalid, actual: '" + url.getHost() + "'");
+ }
+ String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
+ if (!group.startsWith(Constants.PATH_SEPARATOR)) {
+ group = Constants.PATH_SEPARATOR + group;
+ }
+ this.root = group;
+ etcdClient = etcdTransporter.connect(url);
+ etcdClient.addStateListener(new StateListener() {
+ public void stateChanged(int state) {
+ if (state == CONNECTED) {
+ try {
+ recover();
+ } catch (Exception e) {
+ logger.error(e.getMessage(), e);
+ }
+ }
+ }
+ });
+ }
+
+ protected static String appendDefaultPort(String address) {
+ if (address != null && address.length() > 0) {
+ int i = address.indexOf(':');
+ if (i < 0) {
+ return address + ":" + DEFAULT_ETCD_PORT;
+ } else if (Integer.parseInt(address.substring(i + 1)) == 0) {
+ return address.substring(0, i + 1) + DEFAULT_ETCD_PORT;
+ }
+ }
+ return address;
+ }
+
+ @Override
+ public void doRegister(URL url) {
+ try {
+ String path = toUrlPath(url);
+ if (url.getParameter(Constants.DYNAMIC_KEY, true)) {
+ etcdClient.createEphemeral(path);
+ return;
+ }
+ etcdClient.create(path);
+ } catch (Throwable e) {
+ throw new RpcException("Failed to register " + url + " to etcd " + getUrl()
+ + ", cause: " + (OptionUtil.isProtocolError(e)
+ ? "etcd3 registy maybe not supported yet or etcd3 registry not available."
+ : e.getMessage()), e);
+ }
+ }
+
+ @Override
+ public void doUnregister(URL url) {
+ try {
+ String path = toUrlPath(url);
+ etcdClient.delete(path);
+ } catch (Throwable e) {
+ throw new RpcException("Failed to unregister " + url + " to etcd " + getUrl() + ", cause: " + e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void doSubscribe(URL url, NotifyListener listener) {
+ try {
+ if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
+ String root = toRootPath();
+
+ /**
+ * if we interesting all interfaces,
+ * we find current or create container for url, put or get only once.
+ */
+ ConcurrentMap<NotifyListener, ChildListener> listeners =
+ Optional.ofNullable(etcdListeners.get(url))
+ .orElseGet(() -> {
+ ConcurrentMap<NotifyListener, ChildListener> container, prev;
+ prev = etcdListeners.putIfAbsent(url, container = new ConcurrentHashMap<>());
+ return prev != null ? prev : container;
+ });
+
+ /**
+ * if we have not interface watcher listener,
+ * we find current or create listener for current root, put or get only once.
+ */
+ ChildListener interfaceListener =
+ Optional.ofNullable(listeners.get(listener))
+ .orElseGet(() -> {
+ ChildListener childListener, prev;
+ prev = listeners.putIfAbsent(listener, childListener = new ChildListener() {
+ public void childChanged(String parentPath, List<String> currentChildren) {
+ /**
+ * because etcd3 not support direct children watch events,
+ * we should filter not interface events. if we watch /dubbo
+ * and /dubbo/interface, when we put key-value pair {/dubbo/interface/hello hello},
+ * we will got events in watching path /dubbo.
+ */
+ for (String child : currentChildren) {
+ child = URL.decode(child);
+ if (!anyServices.contains(child)) {
+ anyServices.add(child);
+ /**
+ * if new interface event arrived, we watching direct children,
+ * eg: /dubbo/interface, /dubbo/interface and so on.
+ */
+ subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
+ Constants.CHECK_KEY, String.valueOf(false)), listener);
+ }
+ }
+ }
+ });
+ return prev != null ? prev : childListener;
+ });
+
+ etcdClient.create(root);
+ /**
+ * first time, we want pull already interface and then watching direct children,
+ * eg: /dubbo/interface, /dubbo/interface and so on.
+ */
+ List<String> services = etcdClient.addChildListener(root, interfaceListener);
+ for (String service : services) {
+ service = URL.decode(service);
+ anyServices.add(service);
+ subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,
+ Constants.CHECK_KEY, String.valueOf(false)), listener);
+ }
+ } else {
+ List<URL> urls = new ArrayList<URL>();
+ for (String path : toCategoriesPath(url)) {
+
+ /**
+ * if we interesting special categories (providers, consumers, routers and so on),
+ * we find current or create container for url, put or get only once.
+ */
+ ConcurrentMap<NotifyListener, ChildListener> listeners =
+ Optional.ofNullable(etcdListeners.get(url))
+ .orElseGet(() -> {
+ ConcurrentMap<NotifyListener, ChildListener> container, prev;
+ prev = etcdListeners.putIfAbsent(url,
+ container = new ConcurrentHashMap<NotifyListener, ChildListener>());
+ return prev != null ? prev : container;
+ });
+
+ /**
+ * if we have no category watcher listener,
+ * we find current or create listener for current category, put or get only once.
+ */
+ ChildListener childListener =
+ Optional.ofNullable(listeners.get(listener))
+ .orElseGet(() -> {
+ ChildListener watchListener, prev;
+ prev = listeners.putIfAbsent(listener, watchListener = new ChildListener() {
+ public void childChanged(String parentPath, List<String> currentChildren) {
+ EtcdRegistry.this.notify(url, listener,
+ toUrlsWithEmpty(url, parentPath, currentChildren));
+ }
+ });
+ return prev != null ? prev : watchListener;
+ });
+
+ etcdClient.create(path);
+ /**
+ * first time, we want pull already category and then watching direct children,
+ * eg: /dubbo/interface/providers, /dubbo/interface/consumers and so on.
+ */
+ List<String> children = etcdClient.addChildListener(path, childListener);
+ if (children != null) {
+ final String watchPath = path;
+ urls.addAll(toUrlsWithEmpty(url, path, children));
+ }
+ }
+ notify(url, listener, urls);
+ }
+ } catch (Throwable e) {
+ throw new RpcException("Failed to subscribe " + url + " to etcd " + getUrl()
+ + ", cause: " + (OptionUtil.isProtocolError(e)
+ ? "etcd3 registy maybe not supported yet or etcd3 registry not available."
+ : e.getMessage()), e);
+ }
+ }
+
+ @Override
+ public void doUnsubscribe(URL url, NotifyListener listener) {
+ ConcurrentMap<NotifyListener, ChildListener> listeners = etcdListeners.get(url);
+ if (listeners != null) {
+ ChildListener etcdListener = listeners.get(listener);
+ if (etcdListener != null) {
+ // maybe url has many subscribe path
+ for (String path : toUnsubscribedPath(url)) {
+ etcdClient.removeChildListener(path, etcdListener);
+ }
+ }
+ }
+ }
+
+ @Override
+ public boolean isAvailable() {
+ return etcdClient.isConnected();
+ }
+
+ @Override
+ public void destroy() {
+ super.destroy();
+ try {
+ etcdClient.close();
+ } catch (Exception e) {
+ logger.warn("Failed to close etcd client " + getUrl() + ", cause: " + e.getMessage(), e);
+ }
+ }
+
+ protected String toRootDir() {
+ if (root.startsWith(Constants.PATH_SEPARATOR)) {
+ return root;
+ }
+ return Constants.PATH_SEPARATOR + root;
+ }
+
+ protected String toRootPath() {
+ return root;
+ }
+
+ protected String toServicePath(URL url) {
+ String name = url.getServiceInterface();
+ if (Constants.ANY_VALUE.equals(name)) {
+ return toRootPath();
+ }
+ return toRootDir() + Constants.PATH_SEPARATOR + URL.encode(name);
+ }
+
+ protected String[] toCategoriesPath(URL url) {
+ String[] categroies;
+ if (Constants.ANY_VALUE.equals(url.getParameter(Constants.CATEGORY_KEY))) {
+ categroies = new String[]{Constants.PROVIDERS_CATEGORY, Constants.CONSUMERS_CATEGORY,
+ Constants.ROUTERS_CATEGORY, Constants.CONFIGURATORS_CATEGORY};
+ } else {
+ categroies = url.getParameter(Constants.CATEGORY_KEY, new String[]{Constants.DEFAULT_CATEGORY});
+ }
+ String[] paths = new String[categroies.length];
+ for (int i = 0; i < categroies.length; i++) {
+ paths[i] = toServicePath(url) + Constants.PATH_SEPARATOR + categroies[i];
+ }
+ return paths;
+ }
+
+ protected String toCategoryPath(URL url) {
+ return toServicePath(url) + Constants.PATH_SEPARATOR + url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
+ }
+
+ protected String toUrlPath(URL url) {
+ return toCategoryPath(url) + Constants.PATH_SEPARATOR + URL.encode(url.toFullString());
+ }
+
+ protected List<String> toUnsubscribedPath(URL url) {
+ List<String> categories = new ArrayList<>();
+ if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
+ String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
+ if (!group.startsWith(Constants.PATH_SEPARATOR)) {
+ group = Constants.PATH_SEPARATOR + group;
+ }
+ categories.add(group);
+ return categories;
+ } else {
+ for (String path : toCategoriesPath(url)) {
+ categories.add(path);
+ }
+ }
+ return categories;
+ }
+
+ protected List<URL> toUrlsWithoutEmpty(URL consumer, List<String> providers) {
+ List<URL> urls = new ArrayList<URL>();
+ if (providers != null && providers.size() > 0) {
+ for (String provider : providers) {
+ provider = URL.decode(provider);
+ if (provider.contains(Constants.HTTP_SUBFIX_KEY)) {
+ URL url = URL.valueOf(provider);
+ if (UrlUtils.isMatch(consumer, url)) {
+ urls.add(url);
+ }
+ }
+ }
+ }
+ return urls;
+ }
+
+ protected List<URL> toUrlsWithEmpty(URL consumer, String path, List<String> providers) {
+ List<URL> urls = toUrlsWithoutEmpty(consumer, providers);
+ if (urls == null || urls.isEmpty()) {
+ int i = path.lastIndexOf('/');
+ String category = i < 0 ? path : path.substring(i + 1);
+ URL empty = consumer.setProtocol(Constants.EMPTY_PROTOCOL).addParameter(Constants.CATEGORY_KEY, category);
+ urls.add(empty);
+ }
+ return urls;
+ }
+}
diff --git a/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdRegistryFactory.java b/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdRegistryFactory.java
new file mode 100644
index 0000000..187da19
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdRegistryFactory.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.etcd;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.registry.Registry;
+import org.apache.dubbo.registry.support.AbstractRegistryFactory;
+import org.apache.dubbo.remoting.etcd.EtcdTransporter;
+
+public class EtcdRegistryFactory extends AbstractRegistryFactory {
+
+ private EtcdTransporter etcdTransporter;
+
+ @Override
+ protected Registry createRegistry(URL url) {
+ return new EtcdRegistry(url, etcdTransporter);
+ }
+
+ public void setEtcdTransporter(EtcdTransporter etcdTransporter) {
+ this.etcdTransporter = etcdTransporter;
+ }
+}
diff --git a/dubbo-registry/dubbo-registry-etcd3/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.RegistryFactory b/dubbo-registry/dubbo-registry-etcd3/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.RegistryFactory
new file mode 100644
index 0000000..4a6d09c
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-etcd3/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.RegistryFactory
@@ -0,0 +1 @@
+etcd3=org.apache.dubbo.registry.etcd.EtcdRegistryFactory
\ No newline at end of file
diff --git a/dubbo-registry/dubbo-registry-etcd3/src/test/java/org/apache/dubbo/registry/etcd/EtcdRegistryTest.java b/dubbo-registry/dubbo-registry-etcd3/src/test/java/org/apache/dubbo/registry/etcd/EtcdRegistryTest.java
new file mode 100644
index 0000000..c23c1d2
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-etcd3/src/test/java/org/apache/dubbo/registry/etcd/EtcdRegistryTest.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.etcd;
+
+import org.apache.dubbo.common.Constants;
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.extension.ExtensionLoader;
+import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.registry.NotifyListener;
+import org.apache.dubbo.registry.RegistryFactory;
+import org.apache.dubbo.registry.support.AbstractRegistryFactory;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+@Disabled
+public class EtcdRegistryTest {
+
+ String service = "org.apache.dubbo.internal.test.DemoServie";
+ String outerService = "org.apache.dubbo.outer.test.OuterDemoServie";
+ URL serviceUrl = URL.valueOf("dubbo://" + NetUtils.getLocalHost() + "/" + service + "?methods=test1,test2");
+ URL serviceUrl2 = URL.valueOf("dubbo://" + NetUtils.getLocalHost() + "/" + service + "?methods=test1,test2,test3");
+ URL serviceUrl3 = URL.valueOf("dubbo://" + NetUtils.getLocalHost() + "/" + outerService + "?methods=test1,test2");
+ URL registryUrl = URL.valueOf("etcd3://127.0.0.1:2379/org.apache.dubbo.registry.RegistryService");
+ URL consumerUrl = URL.valueOf("dubbo://" + NetUtils.getLocalHost() + ":2018" + "/" + service + "?methods=test1,test2");
+ RegistryFactory registryFactory = ExtensionLoader.getExtensionLoader(RegistryFactory.class).getAdaptiveExtension();
+ EtcdRegistry registry;
+ URL subscribe = new URL(
+ Constants.ADMIN_PROTOCOL, NetUtils.getLocalHost(), 0, "",
+ Constants.INTERFACE_KEY, Constants.ANY_VALUE,
+ Constants.GROUP_KEY, Constants.ANY_VALUE,
+ Constants.VERSION_KEY, Constants.ANY_VALUE,
+ Constants.CLASSIFIER_KEY, Constants.ANY_VALUE,
+ Constants.CATEGORY_KEY, Constants.PROVIDERS_CATEGORY + ","
+ + Constants.CONSUMERS_CATEGORY + ","
+ + Constants.ROUTERS_CATEGORY + ","
+ + Constants.CONFIGURATORS_CATEGORY,
+ Constants.ENABLED_KEY, Constants.ANY_VALUE,
+ Constants.CHECK_KEY, String.valueOf(false));
+
+ @Test
+ public void test_register() {
+
+ registry.register(serviceUrl);
+ Set<URL> registered = registry.getRegistered();
+ Assertions.assertEquals(1, registered.size());
+ Assertions.assertTrue(registered.contains(serviceUrl));
+
+ registry.unregister(serviceUrl);
+ }
+
+ @Test
+ public void test_unregister() {
+
+ registry.register(serviceUrl);
+ Set<URL> registered = registry.getRegistered();
+ Assertions.assertTrue(registered.size() == 1);
+ Assertions.assertTrue(registered.contains(serviceUrl));
+
+ registry.unregister(serviceUrl);
+
+ registered = registry.getRegistered();
+ Assertions.assertTrue(registered.size() == 0);
+ }
+
+ @Test
+ public void test_subscribe() {
+
+ registry.register(serviceUrl);
+
+ final AtomicReference<URL> notifiedUrl = new AtomicReference<URL>();
+ registry.subscribe(consumerUrl, new NotifyListener() {
+ public void notify(List<URL> urls) {
+ notifiedUrl.set(urls.get(0));
+ }
+ });
+ Assertions.assertEquals(serviceUrl.toFullString(), notifiedUrl.get().toFullString());
+ Map<URL, Set<NotifyListener>> arg = registry.getSubscribed();
+ Assertions.assertEquals(consumerUrl, arg.keySet().iterator().next());
+ }
+
+ @Test
+ public void test_subscribe_when_register() throws InterruptedException {
+
+ Assertions.assertTrue(registry.getRegistered().size() == 0);
+ Assertions.assertTrue(registry.getSubscribed().size() == 0);
+
+ CountDownLatch notNotified = new CountDownLatch(2);
+
+ final AtomicReference<URL> notifiedUrl = new AtomicReference<URL>();
+ registry.subscribe(consumerUrl, new NotifyListener() {
+ public void notify(List<URL> urls) {
+ notifiedUrl.set(urls.get(0));
+ notNotified.countDown();
+ }
+ });
+
+ registry.register(serviceUrl);
+
+ Assertions.assertTrue(notNotified.await(15, TimeUnit.SECONDS));
+
+ Assertions.assertEquals(serviceUrl.toFullString(), notifiedUrl.get().toFullString());
+ Map<URL, Set<NotifyListener>> subscribed = registry.getSubscribed();
+ Assertions.assertEquals(consumerUrl, subscribed.keySet().iterator().next());
+ }
+
+ @Test
+ public void test_subscribe_when_register0() throws InterruptedException {
+
+ Assertions.assertTrue(registry.getRegistered().size() == 0);
+ Assertions.assertTrue(registry.getSubscribed().size() == 0);
+
+ CountDownLatch notNotified = new CountDownLatch(3);
+ ConcurrentHashMap<URL, Boolean> notifiedUrls = new ConcurrentHashMap<>();
+ registry.subscribe(consumerUrl, new NotifyListener() {
+ public void notify(List<URL> urls) {
+ if (urls != null && urls.size() > 0) {
+ if (!urls.get(0).getProtocol().equals("empty")) {
+ for (Iterator<URL> iterator = urls.iterator(); iterator.hasNext(); ) {
+ notifiedUrls.put(iterator.next(), true);
+ }
+ }
+ }
+
+ notNotified.countDown();
+ }
+ });
+
+ registry.register(serviceUrl);
+ registry.register(serviceUrl2);
+
+ Assertions.assertTrue(notNotified.await(15, TimeUnit.SECONDS));
+
+ Assertions.assertTrue(notifiedUrls.containsKey(serviceUrl));
+ Assertions.assertTrue(notifiedUrls.containsKey(serviceUrl2));
+ Map<URL, Set<NotifyListener>> subscribed = registry.getSubscribed();
+ Assertions.assertEquals(consumerUrl, subscribed.keySet().iterator().next());
+ }
+
+ @Test
+ public void test_subscribe_when_register1() throws InterruptedException {
+
+ Assertions.assertTrue(registry.getRegistered().size() == 0);
+ Assertions.assertTrue(registry.getSubscribed().size() == 0);
+
+ CountDownLatch notNotified = new CountDownLatch(2);
+
+ final AtomicReference<URL> notifiedUrls = new AtomicReference<URL>();
+ registry.subscribe(consumerUrl, new NotifyListener() {
+ public void notify(List<URL> urls) {
+ notifiedUrls.set(urls.get(0));
+ notNotified.countDown();
+ }
+ });
+
+ registry.register(serviceUrl);
+ // register service3 should not trigger notify
+ registry.register(serviceUrl3);
+
+ Assertions.assertTrue(notNotified.await(15, TimeUnit.SECONDS));
+
+ Assertions.assertEquals(serviceUrl, notifiedUrls.get());
+ Map<URL, Set<NotifyListener>> subscribed = registry.getSubscribed();
+ Assertions.assertEquals(consumerUrl, subscribed.keySet().iterator().next());
+ }
+
+ @Test
+ public void test_subscribe_when_register2() throws InterruptedException {
+
+ Assertions.assertTrue(registry.getRegistered().size() == 0);
+ Assertions.assertTrue(registry.getSubscribed().size() == 0);
+
+ CountDownLatch notNotified = new CountDownLatch(3);
+
+ ConcurrentHashMap<URL, Boolean> notifiedUrls = new ConcurrentHashMap<>();
+
+ registry.subscribe(subscribe, new NotifyListener() {
+ public void notify(List<URL> urls) {
+ if (urls != null && urls.size() > 0) {
+ if (!urls.get(0).getProtocol().equals("empty")) {
+ for (Iterator<URL> iterator = urls.iterator(); iterator.hasNext(); ) {
+ notifiedUrls.put(iterator.next(), true);
+ }
+ notNotified.countDown();
+ }
+ }
+ }
+ });
+
+ registry.register(serviceUrl);
+ registry.register(serviceUrl2);
+ // service3 interface is not equals server2
+ registry.register(serviceUrl3);
+
+ Assertions.assertTrue(notNotified.await(15, TimeUnit.SECONDS));
+ Assertions.assertTrue(notifiedUrls.size() == 3);
+ Assertions.assertTrue(notifiedUrls.containsKey(serviceUrl));
+ Assertions.assertTrue(notifiedUrls.containsKey(serviceUrl2));
+ Assertions.assertTrue(notifiedUrls.containsKey(serviceUrl3));
+ }
+
+ @Test
+ public void test_unsubscribe() throws InterruptedException {
+
+ Assertions.assertTrue(registry.getRegistered().size() == 0);
+ Assertions.assertTrue(registry.getSubscribed().size() == 0);
+
+ CountDownLatch notNotified = new CountDownLatch(2);
+
+ final AtomicReference<URL> notifiedUrl = new AtomicReference<URL>();
+
+ NotifyListener listener = new NotifyListener() {
+ public void notify(List<URL> urls) {
+ if (urls != null) {
+ for (Iterator<URL> iterator = urls.iterator(); iterator.hasNext(); ) {
+ URL url = iterator.next();
+ if (!url.getProtocol().equals("empty")) {
+ notifiedUrl.set(url);
+ notNotified.countDown();
+ }
+ }
+ }
+ }
+ };
+ registry.subscribe(consumerUrl, listener);
+ registry.unsubscribe(consumerUrl, listener);
+
+ registry.register(serviceUrl);
+
+ Assertions.assertFalse(notNotified.await(2, TimeUnit.SECONDS));
+ // expect nothing happen
+ Assertions.assertTrue(notifiedUrl.get() == null);
+ }
+
+ @BeforeEach
+ public void setUp() {
+ registry = (EtcdRegistry) registryFactory.getRegistry(registryUrl);
+ Assertions.assertTrue(registry != null);
+ if (!registry.isAvailable()) {
+ AbstractRegistryFactory.destroyAll();
+ registry = (EtcdRegistry) registryFactory.getRegistry(registryUrl);
+ }
+ }
+
+ @AfterEach
+ public void tearDown() {
+
+ registry.unregister(serviceUrl);
+ registry.unregister(serviceUrl2);
+ registry.unregister(serviceUrl3);
+ registry.unregister(subscribe);
+
+ registry.destroy();
+ }
+
+
+}
diff --git a/dubbo-registry/pom.xml b/dubbo-registry/pom.xml
index f74cca0..33a329b 100644
--- a/dubbo-registry/pom.xml
+++ b/dubbo-registry/pom.xml
@@ -35,5 +35,6 @@
<module>dubbo-registry-zookeeper</module>
<module>dubbo-registry-redis</module>
<module>dubbo-registry-consul</module>
+ <module>dubbo-registry-etcd3</module>
</modules>
</project>
diff --git a/dubbo-remoting/dubbo-remoting-etcd3/pom.xml b/dubbo-remoting/dubbo-remoting-etcd3/pom.xml
new file mode 100644
index 0000000..768052f
--- /dev/null
+++ b/dubbo-remoting/dubbo-remoting-etcd3/pom.xml
@@ -0,0 +1,52 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>dubbo-remoting</artifactId>
+ <groupId>org.apache.dubbo</groupId>
+ <version>2.7.1-SNAPSHOT</version>
+ </parent>
+ <artifactId>dubbo-remoting-etcd3</artifactId>
+ <packaging>jar</packaging>
+ <name>${project.artifactId}</name>
+ <description>The etcd3 remoting module of Dubbo project</description>
+ <properties>
+ <skip_maven_deploy>false</skip_maven_deploy>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo-common</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.etcd</groupId>
+ <artifactId>jetcd-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-all</artifactId>
+ </dependency>
+ </dependencies>
+
+
+</project>
\ No newline at end of file
diff --git a/dubbo-common/src/test/java/org/apache/dubbo/common/config/ConfigurationUtilsTest.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/AbstractRetryPolicy.java
similarity index 52%
copy from dubbo-common/src/test/java/org/apache/dubbo/common/config/ConfigurationUtilsTest.java
copy to dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/AbstractRetryPolicy.java
index 89da34c..f626202 100644
--- a/dubbo-common/src/test/java/org/apache/dubbo/common/config/ConfigurationUtilsTest.java
+++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/AbstractRetryPolicy.java
@@ -14,29 +14,31 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dubbo.common.config;
+package org.apache.dubbo.remoting.etcd;
-import org.apache.dubbo.common.Constants;
+public abstract class AbstractRetryPolicy implements RetryPolicy {
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
+ private final int maxRetried;
-/**
- *
- */
-public class ConfigurationUtilsTest {
-
- @Test
- public void testGetServerShutdownTimeout () {
- System.setProperty(Constants.SHUTDOWN_WAIT_KEY, " 10000");
- Assertions.assertEquals(10000, ConfigurationUtils.getServerShutdownTimeout());
- System.clearProperty(Constants.SHUTDOWN_WAIT_KEY);
+ protected AbstractRetryPolicy(int maxRetried) {
+ this.maxRetried = maxRetried;
}
- @Test
- public void testGetProperty () {
- System.setProperty(Constants.SHUTDOWN_WAIT_KEY, " 10000");
- Assertions.assertEquals("10000", ConfigurationUtils.getProperty(Constants.SHUTDOWN_WAIT_KEY));
- System.clearProperty(Constants.SHUTDOWN_WAIT_KEY);
+ public boolean shouldRetry(int retried, long elapsed, boolean sleep) {
+ if (retried < maxRetried) {
+ try {
+ if (sleep) {
+ Thread.sleep(getSleepTime(retried, elapsed));
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return false;
+ }
+ return true;
+ }
+ return false;
}
+
+ protected abstract long getSleepTime(int retried, long elapsed);
+
}
diff --git a/dubbo-common/src/test/java/org/apache/dubbo/common/config/ConfigurationUtilsTest.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/ChildListener.java
similarity index 51%
copy from dubbo-common/src/test/java/org/apache/dubbo/common/config/ConfigurationUtilsTest.java
copy to dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/ChildListener.java
index 89da34c..46a0af8 100644
--- a/dubbo-common/src/test/java/org/apache/dubbo/common/config/ConfigurationUtilsTest.java
+++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/ChildListener.java
@@ -14,29 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dubbo.common.config;
+package org.apache.dubbo.remoting.etcd;
-import org.apache.dubbo.common.Constants;
+import java.util.List;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
+public interface ChildListener {
-/**
- *
- */
-public class ConfigurationUtilsTest {
-
- @Test
- public void testGetServerShutdownTimeout () {
- System.setProperty(Constants.SHUTDOWN_WAIT_KEY, " 10000");
- Assertions.assertEquals(10000, ConfigurationUtils.getServerShutdownTimeout());
- System.clearProperty(Constants.SHUTDOWN_WAIT_KEY);
- }
+ void childChanged(String path, List<String> children);
- @Test
- public void testGetProperty () {
- System.setProperty(Constants.SHUTDOWN_WAIT_KEY, " 10000");
- Assertions.assertEquals("10000", ConfigurationUtils.getProperty(Constants.SHUTDOWN_WAIT_KEY));
- System.clearProperty(Constants.SHUTDOWN_WAIT_KEY);
- }
}
diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdClient.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdClient.java
new file mode 100644
index 0000000..b1e765d
--- /dev/null
+++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdClient.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.etcd;
+
+import org.apache.dubbo.common.URL;
+
+import java.util.List;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public interface EtcdClient {
+
+ /**
+ * save the specified path to the etcd registry.
+ *
+ * @param path the path to be saved
+ */
+ void create(String path);
+
+ /**
+ * save the specified path to the etcd registry.
+ * if node disconnect from etcd, it will be deleted
+ * automatically by etcd when sessian timeout.
+ *
+ * @param path the path to be saved
+ * @return the lease of current path.
+ */
+ long createEphemeral(String path);
+
+ /**
+ * remove the specified from etcd registry.
+ *
+ * @param path the path to be removed
+ */
+ void delete(String path);
+
+ /**
+ * find direct children directory, excluding path self,
+ * Never return null.
+ *
+ * @param path the path to be found direct children.
+ * @return direct children directory, contains zero element
+ * list if children directory not exists.
+ */
+ List<String> getChildren(String path);
+
+ /**
+ * register children listener for specified path.
+ *
+ * @param path the path to be watched when children is added, delete or update.
+ * @param listener when children is changed , listener will be trigged.
+ * @return direct children directory, contains zero element
+ * list if children directory not exists.
+ */
+ List<String> addChildListener(String path, ChildListener listener);
+
+ /**
+ * find watcher of the children listener for specified path.
+ *
+ * @param path the path to be watched when children is added, delete or update.
+ * @param listener when children is changed , listener will be trigged.
+ * @return watcher if find else null
+ */
+ <T> T getChildListener(String path, ChildListener listener);
+
+ /**
+ * unregister children lister for specified path.
+ *
+ * @param path the path to be unwatched .
+ * @param listener when children is changed , lister will be trigged.
+ */
+ void removeChildListener(String path, ChildListener listener);
+
+ /**
+ * support connection notify if connection state was changed.
+ *
+ * @param listener if state changed, listener will be triggered.
+ */
+ void addStateListener(StateListener listener);
+
+ /**
+ * remove connection notify if connection state was changed.
+ *
+ * @param listener remove already registered listener, if listener
+ * not exists nothing happened.
+ */
+ void removeStateListener(StateListener listener);
+
+ /**
+ * test if current client is active.
+ *
+ * @return true if connection is active else false.
+ */
+ boolean isConnected();
+
+ /**
+ * close current client and release all resourses.
+ */
+ void close();
+
+ URL getUrl();
+
+ /***
+ * create new lease from specified second ,it should be waiting if failed.<p>
+ *
+ * @param second lease time (support second only).
+ * @return lease id from etcd
+ */
+ long createLease(long second);
+
+ /***
+ * create new lease from specified ttl second before waiting specified timeout.<p>
+ *
+ * @param ttl lease time (support second only).
+ * @param timeout the maximum time to wait
+ * @param unit the time unit of the timeout argument
+ * @throws CancellationException if this future was cancelled
+ * @throws ExecutionException if this future completed exceptionally
+ * @throws InterruptedException if the current thread was interrupted
+ * while waiting
+ * @throws TimeoutException if the wait timed out
+ * @return lease id from etcd
+ */
+ public long createLease(long ttl, long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException;
+
+ /**
+ * revoke specified lease, any associated path will removed automatically.
+ *
+ * @param lease to be removed lease
+ */
+ void revokeLease(long lease);
+
+}
diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdTransporter.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdTransporter.java
new file mode 100644
index 0000000..2c0befb
--- /dev/null
+++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdTransporter.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.etcd;
+
+import org.apache.dubbo.common.Constants;
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.extension.Adaptive;
+import org.apache.dubbo.common.extension.SPI;
+
+@SPI("jetcd")
+public interface EtcdTransporter {
+
+ @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
+ EtcdClient connect(URL url);
+
+}
diff --git a/dubbo-common/src/test/java/org/apache/dubbo/common/config/ConfigurationUtilsTest.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/RetryPolicy.java
similarity index 51%
copy from dubbo-common/src/test/java/org/apache/dubbo/common/config/ConfigurationUtilsTest.java
copy to dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/RetryPolicy.java
index 89da34c..b1fc525 100644
--- a/dubbo-common/src/test/java/org/apache/dubbo/common/config/ConfigurationUtilsTest.java
+++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/RetryPolicy.java
@@ -14,29 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dubbo.common.config;
+package org.apache.dubbo.remoting.etcd;
-import org.apache.dubbo.common.Constants;
+public interface RetryPolicy {
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
+ /**
+ * Whether retry is supported when operation fails.
+ *
+ * @param retried the number of times retried so far
+ * @param elapsed the elapsed time in millisecond since the operation was attempted
+ * @param sleep should be sleep
+ * @return true should be retry
+ */
+ public boolean shouldRetry(int retried, long elapsed, boolean sleep);
-/**
- *
- */
-public class ConfigurationUtilsTest {
-
- @Test
- public void testGetServerShutdownTimeout () {
- System.setProperty(Constants.SHUTDOWN_WAIT_KEY, " 10000");
- Assertions.assertEquals(10000, ConfigurationUtils.getServerShutdownTimeout());
- System.clearProperty(Constants.SHUTDOWN_WAIT_KEY);
- }
-
- @Test
- public void testGetProperty () {
- System.setProperty(Constants.SHUTDOWN_WAIT_KEY, " 10000");
- Assertions.assertEquals("10000", ConfigurationUtils.getProperty(Constants.SHUTDOWN_WAIT_KEY));
- System.clearProperty(Constants.SHUTDOWN_WAIT_KEY);
- }
}
diff --git a/dubbo-common/src/test/java/org/apache/dubbo/common/config/ConfigurationUtilsTest.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/StateListener.java
similarity index 51%
copy from dubbo-common/src/test/java/org/apache/dubbo/common/config/ConfigurationUtilsTest.java
copy to dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/StateListener.java
index 89da34c..4358083 100644
--- a/dubbo-common/src/test/java/org/apache/dubbo/common/config/ConfigurationUtilsTest.java
+++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/StateListener.java
@@ -14,29 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dubbo.common.config;
+package org.apache.dubbo.remoting.etcd;
-import org.apache.dubbo.common.Constants;
+public interface StateListener {
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
+ int DISCONNECTED = 0;
-/**
- *
- */
-public class ConfigurationUtilsTest {
+ int CONNECTED = 1;
- @Test
- public void testGetServerShutdownTimeout () {
- System.setProperty(Constants.SHUTDOWN_WAIT_KEY, " 10000");
- Assertions.assertEquals(10000, ConfigurationUtils.getServerShutdownTimeout());
- System.clearProperty(Constants.SHUTDOWN_WAIT_KEY);
- }
+ void stateChanged(int connected);
- @Test
- public void testGetProperty () {
- System.setProperty(Constants.SHUTDOWN_WAIT_KEY, " 10000");
- Assertions.assertEquals("10000", ConfigurationUtils.getProperty(Constants.SHUTDOWN_WAIT_KEY));
- System.clearProperty(Constants.SHUTDOWN_WAIT_KEY);
- }
}
diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java
new file mode 100644
index 0000000..979caee
--- /dev/null
+++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.etcd.jetcd;
+
+import com.google.protobuf.ByteString;
+import io.etcd.jetcd.ByteSequence;
+import io.etcd.jetcd.api.Event;
+import io.etcd.jetcd.api.KeyValue;
+import io.etcd.jetcd.api.WatchCancelRequest;
+import io.etcd.jetcd.api.WatchCreateRequest;
+import io.etcd.jetcd.api.WatchGrpc;
+import io.etcd.jetcd.api.WatchRequest;
+import io.etcd.jetcd.api.WatchResponse;
+import io.etcd.jetcd.common.exception.ClosedClientException;
+import io.grpc.Status;
+import io.grpc.stub.StreamObserver;
+import io.netty.util.internal.ConcurrentSet;
+import org.apache.dubbo.common.Constants;
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.NamedThreadFactory;
+import org.apache.dubbo.remoting.etcd.ChildListener;
+import org.apache.dubbo.remoting.etcd.StateListener;
+import org.apache.dubbo.remoting.etcd.option.OptionUtil;
+import org.apache.dubbo.remoting.etcd.support.AbstractEtcdClient;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.dubbo.remoting.etcd.jetcd.JEtcdClientWrapper.UTF_8;
+
+/**
+ * etct3 client.
+ */
+public class JEtcdClient extends AbstractEtcdClient<JEtcdClient.EtcdWatcher> {
+
+ private JEtcdClientWrapper clientWrapper;
+ private ScheduledExecutorService reconnectSchedule;
+
+ private int delayPeriod;
+ private Logger logger = LoggerFactory.getLogger(JEtcdClient.class);
+
+ public JEtcdClient(URL url) {
+ super(url);
+ try {
+ clientWrapper = new JEtcdClientWrapper(url);
+ clientWrapper.setConnectionStateListener((client, state) -> {
+ if (state == StateListener.CONNECTED) {
+ JEtcdClient.this.stateChanged(StateListener.CONNECTED);
+ } else if (state == StateListener.DISCONNECTED) {
+ JEtcdClient.this.stateChanged(StateListener.DISCONNECTED);
+ }
+ });
+ delayPeriod = getUrl().getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
+ reconnectSchedule = Executors.newScheduledThreadPool(1,
+ new NamedThreadFactory("auto-reconnect"));
+ clientWrapper.start();
+ } catch (Exception e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void doCreatePersistent(String path) {
+ clientWrapper.createPersistent(path);
+ }
+
+ @Override
+ public long doCreateEphemeral(String path) {
+ return clientWrapper.createEphemeral(path);
+ }
+
+ @Override
+ public boolean checkExists(String path) {
+ return clientWrapper.checkExists(path);
+ }
+
+ @Override
+ public EtcdWatcher createChildWatcherListener(String path, ChildListener listener) {
+ return new EtcdWatcher(listener);
+ }
+
+ @Override
+ public List<String> addChildWatcherListener(String path, EtcdWatcher etcdWatcher) {
+ return etcdWatcher.forPath(path);
+ }
+
+ @Override
+ public void removeChildWatcherListener(String path, EtcdWatcher etcdWatcher) {
+ etcdWatcher.unwatch();
+ }
+
+ @Override
+ public List<String> getChildren(String path) {
+ return clientWrapper.getChildren(path);
+ }
+
+ @Override
+ public boolean isConnected() {
+ return clientWrapper.isConnected();
+ }
+
+ @Override
+ public long createLease(long second) {
+ return clientWrapper.createLease(second);
+ }
+
+ @Override
+ public long createLease(long ttl, long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ return clientWrapper.createLease(ttl, timeout, unit);
+ }
+
+ @Override
+ public void delete(String path) {
+ clientWrapper.delete(path);
+ }
+
+ @Override
+ public void revokeLease(long lease) {
+ clientWrapper.revokeLease(lease);
+ }
+
+ @Override
+ public void doClose() {
+ try {
+ reconnectSchedule.shutdownNow();
+ } catch (Exception e) {
+
+ } finally {
+ clientWrapper.doClose();
+ }
+ }
+
+ public class EtcdWatcher implements StreamObserver<WatchResponse> {
+
+ protected WatchGrpc.WatchStub watchStub;
+ protected StreamObserver<WatchRequest> watchRequest;
+ protected long watchId;
+ protected String path;
+ protected Throwable throwable;
+ protected Set<String> urls = new ConcurrentSet<>();
+ private ChildListener listener;
+
+ public EtcdWatcher(ChildListener listener) {
+ this.listener = listener;
+ }
+
+ @Override
+ public void onNext(WatchResponse response) {
+
+ // prevents grpc on sending watchResponse to a closed watch client.
+ if (!isConnected()) {
+ return;
+ }
+
+ watchId = response.getWatchId();
+
+ if (listener != null) {
+ int modified = 0;
+ String service = null;
+ Iterator<Event> iterator = response.getEventsList().iterator();
+ while (iterator.hasNext()) {
+ Event event = iterator.next();
+ switch (event.getType()) {
+ case PUT: {
+ if (((service = find(event)) != null)
+ && safeUpdate(service, true)) modified++;
+ break;
+ }
+ case DELETE: {
+ if (((service = find(event)) != null)
+ && safeUpdate(service, false)) modified++;
+ break;
+ }
+ default:
+ break;
+ }
+ }
+ if (modified > 0) {
+ listener.childChanged(path, new ArrayList<>(urls));
+ }
+
+ }
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ tryReconnect(e);
+ }
+
+ public void unwatch() {
+
+ // prevents grpc on sending watchResponse to a closed watch client.
+ if (!isConnected()) {
+ return;
+ }
+
+ try {
+ this.listener = null;
+ if (watchRequest != null) {
+ WatchCancelRequest watchCancelRequest =
+ WatchCancelRequest.newBuilder().setWatchId(watchId).build();
+ WatchRequest cancelRequest = WatchRequest.newBuilder()
+ .setCancelRequest(watchCancelRequest).build();
+ this.watchRequest.onNext(cancelRequest);
+ }
+ } catch (Exception ignored) {
+ logger.warn("Failed to cancel watch for path '" + path + "'", ignored);
+ }
+ }
+
+ public List<String> forPath(String path) {
+
+ if (!isConnected()) {
+ throw new ClosedClientException("watch client has been closed, path '" + path + "'");
+ }
+
+ if (this.path != null) {
+ if (this.path.equals(path)) {
+ return clientWrapper.getChildren(path);
+ }
+ unwatch();
+ }
+
+ this.watchStub = WatchGrpc.newStub(clientWrapper.getChannel());
+ this.watchRequest = watchStub.watch(this);
+ this.path = path;
+ this.watchRequest.onNext(nextRequest());
+
+ List<String> children = clientWrapper.getChildren(path);
+
+ /**
+ * caching the current service
+ */
+ if (!children.isEmpty()) {
+ this.urls.addAll(filterChildren(children));
+ }
+
+ return new ArrayList<>(urls);
+ }
+
+ private boolean safeUpdate(String service, boolean add) {
+ synchronized (this) {
+ /**
+ * If the collection already contains the specified service, do nothing
+ */
+ return add ? this.urls.add(service) : this.urls.remove(service);
+ }
+ }
+
+ private String find(Event event) {
+ KeyValue keyValue = event.getKv();
+ String key = keyValue.getKey().toStringUtf8();
+
+ int len = path.length(), index = len, count = 0;
+ if (key.length() >= index) {
+ for (; (index = key.indexOf(Constants.PATH_SEPARATOR, index)) != -1; ++index) {
+ if (count++ > 1) break;
+ }
+ }
+
+ /**
+ * if children changed , we should refresh invokers
+ */
+ if (count == 1) {
+ /**
+ * remove prefix
+ */
+ return key.substring(len + 1);
+ }
+
+ return null;
+ }
+
+ private List<String> filterChildren(List<String> children) {
+ if (children == null) return Collections.emptyList();
+ if (children.size() <= 0) return children;
+ final int len = path.length();
+ return children.stream().parallel()
+ .filter(child -> {
+ int index = len, count = 0;
+ if (child.length() > len) {
+ for (; (index = child.indexOf(Constants.PATH_SEPARATOR, index)) != -1; ++index) {
+ if (count++ > 1) break;
+ }
+ }
+ return count == 1;
+ })
+ .map(child -> child.substring(len + 1))
+ .collect(toList());
+ }
+
+ /**
+ * create new watching request for current path.
+ */
+ protected WatchRequest nextRequest() {
+
+ WatchCreateRequest.Builder builder = WatchCreateRequest.newBuilder()
+ .setKey(ByteString.copyFromUtf8(path))
+ .setRangeEnd(ByteString.copyFrom(
+ OptionUtil.prefixEndOf(ByteSequence.from(path, UTF_8)).getBytes()))
+ .setProgressNotify(true);
+
+ return WatchRequest.newBuilder().setCreateRequest(builder).build();
+ }
+
+ public void tryReconnect(Throwable e) {
+
+ this.throwable = e;
+
+ logger.error("watcher client has error occurred, current path '" + path + "'", e);
+
+ // prevents grpc on sending error to a closed watch client.
+ if (!isConnected()) {
+ return;
+ }
+
+
+ Status status = Status.fromThrowable(e);
+ // system may be recover later, current connect won't be lost
+ if (OptionUtil.isHaltError(status) || OptionUtil.isNoLeaderError(status)) {
+ reconnectSchedule.schedule(this::reconnect, new Random().nextInt(delayPeriod), TimeUnit.MILLISECONDS);
+ return;
+ }
+ // reconnect with a delay; avoiding immediate retry on a long connection downtime.
+ reconnectSchedule.schedule(this::reconnect, new Random().nextInt(delayPeriod), TimeUnit.MILLISECONDS);
+ }
+
+ protected synchronized void reconnect() {
+ this.closeWatchRequest();
+ this.recreateWatchRequest();
+ }
+
+ protected void recreateWatchRequest() {
+ if (watchRequest == null) {
+ this.watchStub = WatchGrpc.newStub(clientWrapper.getChannel());
+ this.watchRequest = watchStub.watch(this);
+ }
+ this.watchRequest.onNext(nextRequest());
+ this.throwable = null;
+ logger.warn("watch client retried connect for path '" + path + "', connection status : " + isConnected());
+ }
+
+ protected void closeWatchRequest() {
+ if (this.watchRequest == null) {
+ return;
+ }
+ this.watchRequest.onCompleted();
+ this.watchRequest = null;
+ }
+
+ @Override
+ public void onCompleted() {
+ // do not touch this method, if you want terminate this stream.
+ }
+ }
+}
diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java
new file mode 100644
index 0000000..e563cc2
--- /dev/null
+++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java
@@ -0,0 +1,706 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.etcd.jetcd;
+
+import io.etcd.jetcd.ByteSequence;
+import io.etcd.jetcd.Client;
+import io.etcd.jetcd.ClientBuilder;
+import io.etcd.jetcd.CloseableClient;
+import io.etcd.jetcd.Observers;
+import io.etcd.jetcd.common.exception.ErrorCode;
+import io.etcd.jetcd.common.exception.EtcdException;
+import io.etcd.jetcd.lease.LeaseKeepAliveResponse;
+import io.etcd.jetcd.options.GetOption;
+import io.etcd.jetcd.options.PutOption;
+import io.grpc.ConnectivityState;
+import io.grpc.ManagedChannel;
+import io.grpc.stub.StreamObserver;
+import io.grpc.util.RoundRobinLoadBalancerFactory;
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.ConcurrentHashSet;
+import org.apache.dubbo.common.utils.NamedThreadFactory;
+import org.apache.dubbo.common.utils.StringUtils;
+import org.apache.dubbo.remoting.etcd.RetryPolicy;
+import org.apache.dubbo.remoting.etcd.StateListener;
+import org.apache.dubbo.remoting.etcd.option.Constants;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+
+import static java.util.stream.Collectors.toList;
+
+public class JEtcdClientWrapper {
+
+ private Logger logger = LoggerFactory.getLogger(JEtcdClientWrapper.class);
+
+ private final URL url;
+ private volatile Client client;
+ private volatile boolean started = false;
+ private volatile boolean connectState = false;
+ private ScheduledFuture future;
+ private ScheduledExecutorService reconnectNotify;
+ private AtomicReference<ManagedChannel> channel;
+
+ private ConnectionStateListener connectionStateListener;
+
+ private long expirePeriod;
+
+ private CompletableFuture<Client> completableFuture;
+
+ private RetryPolicy retryPolicy;
+
+ private RuntimeException failed;
+
+ private final ScheduledFuture<?> retryFuture;
+ private final ScheduledExecutorService retryExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("Etcd3RegistryKeepAliveFailedRetryTimer", true));
+
+ private final Set<String> failedRegistered = new ConcurrentHashSet<String>();
+
+ private final Set<String> registeredPaths = new ConcurrentHashSet<>();
+ private volatile CloseableClient keepAlive = null;
+
+ /**
+ * Support temporary nodes to reuse the same lease
+ */
+ private volatile long globalLeaseId;
+
+ private volatile boolean cancelKeepAlive = false;
+
+ public static final Charset UTF_8 = Charset.forName("UTF-8");
+
+ public JEtcdClientWrapper(URL url) {
+ this.url = url;
+ this.expirePeriod = url.getParameter(Constants.SESSION_TIMEOUT_KEY, Constants.DEFAULT_KEEPALIVE_TIMEOUT) / 1000;
+ if (expirePeriod <= 0) {
+ this.expirePeriod = Constants.DEFAULT_KEEPALIVE_TIMEOUT / 1000;
+ }
+ this.channel = new AtomicReference<>();
+ this.completableFuture = CompletableFuture.supplyAsync(() -> prepareClient(url));
+ this.reconnectNotify = Executors.newScheduledThreadPool(1,
+ new NamedThreadFactory("reconnectNotify", true));
+ this.retryPolicy = new RetryNTimes(1, 1000, TimeUnit.MILLISECONDS);
+
+ this.failed = new IllegalStateException("Etcd3 registry is not connected yet, url:" + url);
+ int retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
+ this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
+ public void run() {
+ try {
+ retry();
+ } catch (Throwable t) {
+ logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
+ }
+ }
+ }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
+ }
+
+ private Client prepareClient(URL url) {
+
+ int maxInboudSize = DEFAULT_INBOUT_SIZE;
+ if (StringUtils.isNotEmpty(System.getProperty(GRPC_MAX_INBOUD_SIZE_KEY))) {
+ maxInboudSize = Integer.valueOf(System.getProperty(GRPC_MAX_INBOUD_SIZE_KEY));
+ }
+
+ ClientBuilder clientBuilder = Client.builder()
+ .loadBalancerFactory(RoundRobinLoadBalancerFactory.getInstance())
+ .endpoints(endPoints(url.getBackupAddress()))
+ .maxInboundMessageSize(maxInboudSize);
+
+ return clientBuilder.build();
+ }
+
+ public Client getClient() {
+ return client;
+ }
+
+ /**
+ * try to get current connected channel.
+ *
+ * @return connected channel.
+ */
+ public ManagedChannel getChannel() {
+ if (channel.get() == null || (channel.get().isShutdown() || channel.get().isTerminated())) {
+ channel.set(newChannel(client));
+ }
+ return channel.get();
+ }
+
+ /**
+ * find direct children directory, excluding path self,
+ * Never return null.
+ *
+ * @param path the path to be found direct children.
+ * @return direct children directory, contains zero element
+ * list if children directory not exists.
+ */
+ public List<String> getChildren(String path) {
+ try {
+ return RetryLoops.invokeWithRetry(
+ new Callable<List<String>>() {
+ @Override
+ public List<String> call() throws Exception {
+ requiredNotNull(client, failed);
+ int len = path.length();
+ return client.getKVClient()
+ .get(ByteSequence.from(path, UTF_8),
+ GetOption.newBuilder().withPrefix(ByteSequence.from(path, UTF_8)).build())
+ .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS)
+ .getKvs().stream().parallel()
+ .filter(pair -> {
+ String key = pair.getKey().toString(UTF_8);
+ int index = len, count = 0;
+ if (key.length() > len) {
+ for (; (index = key.indexOf(Constants.PATH_SEPARATOR, index)) != -1; ++index) {
+ if (count++ > 1) break;
+ }
+ }
+ return count == 1;
+ })
+ .map(pair -> pair.getKey().toString(UTF_8))
+ .collect(toList());
+ }
+ }, retryPolicy);
+ } catch (Exception e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ public boolean isConnected() {
+ return ConnectivityState.READY == (getChannel().getState(false))
+ || ConnectivityState.IDLE == (getChannel().getState(false));
+ }
+
+ public long createLease(long second) {
+ try {
+ return RetryLoops.invokeWithRetry(
+ new Callable<Long>() {
+ @Override
+ public Long call() throws Exception {
+ requiredNotNull(client, failed);
+ return client.getLeaseClient()
+ .grant(second)
+ .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS)
+ .getID();
+ }
+ }, retryPolicy);
+ } catch (Exception e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ public void revokeLease(long lease) {
+ try {
+ RetryLoops.invokeWithRetry(
+ new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ requiredNotNull(client, failed);
+ client.getLeaseClient()
+ .revoke(lease)
+ .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS);
+ return null;
+ }
+ }, retryPolicy);
+ } catch (Exception e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ public long createLease(long ttl, long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException {
+
+ if (timeout <= 0) {
+ return createLease(ttl);
+ }
+
+ requiredNotNull(client, failed);
+ return client.getLeaseClient()
+ .grant(ttl)
+ .get(timeout, unit).getID();
+ }
+
+
+ /**
+ * try to check if path exists.
+ */
+ public boolean checkExists(String path) {
+ try {
+ return RetryLoops.invokeWithRetry(
+ new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws Exception {
+ requiredNotNull(client, failed);
+ return client.getKVClient()
+ .get(ByteSequence.from(path, UTF_8), GetOption.newBuilder().withCountOnly(true).build())
+ .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS)
+ .getCount() > 0;
+ }
+ }, retryPolicy);
+ } catch (Exception e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ /**
+ * only internal use only, maybe change in the future
+ */
+ protected Long find(String path) {
+ try {
+ return RetryLoops.invokeWithRetry(
+ new Callable<Long>() {
+ @Override
+ public Long call() throws Exception {
+ requiredNotNull(client, failed);
+ return client.getKVClient()
+ .get(ByteSequence.from(path, UTF_8))
+ .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS)
+ .getKvs().stream()
+ .mapToLong(keyValue -> Long.valueOf(keyValue.getValue().toString(UTF_8)))
+ .findFirst().getAsLong();
+ }
+ }, retryPolicy);
+ } catch (Exception e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ public void createPersistent(String path) {
+ try {
+ RetryLoops.invokeWithRetry(
+ new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ requiredNotNull(client, failed);
+ client.getKVClient()
+ .put(ByteSequence.from(path, UTF_8),
+ ByteSequence.from(String.valueOf(path.hashCode()), UTF_8))
+ .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS);
+ return null;
+ }
+ }, retryPolicy);
+ } catch (Exception e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ /**
+ * create new ephemeral path save to etcd .
+ * if node disconnect from etcd, it will be deleted
+ * automatically by etcd when sessian timeout.
+ *
+ * @param path the path to be saved
+ * @return the lease of current path.
+ */
+ public long createEphemeral(String path) {
+ try {
+ return RetryLoops.invokeWithRetry(
+ new Callable<Long>() {
+ @Override
+ public Long call() throws Exception {
+ requiredNotNull(client, failed);
+
+ keepAlive();
+ registeredPaths.add(path);
+ client.getKVClient()
+ .put(ByteSequence.from(path, UTF_8)
+ , ByteSequence.from(String.valueOf(globalLeaseId), UTF_8)
+ , PutOption.newBuilder().withLeaseId(globalLeaseId).build())
+ .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS);
+ return globalLeaseId;
+ }
+ }, retryPolicy);
+ } catch (Exception e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ // easy for mock
+ public void keepAlive(long lease) {
+ this.keepAlive(lease, null);
+ }
+
+ private <T> void keepAlive(long lease, Consumer<T> onFailed) {
+ final StreamObserver<LeaseKeepAliveResponse> observer = new Observers.Builder()
+ .onError((e) -> {
+ if (e instanceof EtcdException) {
+ EtcdException error = (EtcdException) e;
+ /**
+ * ttl has expired
+ */
+ if (error.getErrorCode() == ErrorCode.NOT_FOUND) {
+ keepAlive0(onFailed);
+ }
+ }
+ }).onCompleted(() -> {
+ /**
+ * deadline reached.
+ */
+ keepAlive0(onFailed);
+ }).build();
+
+ /**
+ * If there is already a keepalive, cancel first
+ */
+ cancelKeepAlive();
+
+ /**
+ * create and set new keepAlive to globalKeepAliveRef
+ */
+ this.keepAlive = client.getLeaseClient().keepAlive(lease, observer);
+ }
+
+ private void keepAlive() throws Exception {
+ if (keepAlive == null) {
+ synchronized (this) {
+ if (keepAlive == null) {
+ this.globalLeaseId = client.getLeaseClient()
+ .grant(expirePeriod)
+ .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS)
+ .getID();
+ /**
+ * If the keepAlive expires, the registration will be re-attempted
+ */
+ keepAlive(globalLeaseId, (NULL) -> recovery());
+ }
+ }
+ }
+ }
+
+ private <T> void keepAlive0(Consumer<T> onFailed) {
+ if (onFailed != null) {
+
+ /**
+ * The following two scenarios will cause the keep-alive failureļ¼
+ *
+ * 1. Service is offline
+ * 2. Local deadline check expired
+ *
+ * The multiplex lease cannot update the local deadline,
+ * causing the extreme scene service to be dropped.
+ *
+ */
+ try {
+ if (logger.isWarnEnabled()) {
+ logger.warn("Failed to keep alive for global lease, waiting for retry again.");
+ }
+ onFailed.accept(null);
+ } catch (Exception ignored) {
+ logger.warn("Failed to recover from global lease expired or lease deadline exceeded.", ignored);
+ }
+ }
+ }
+
+ private void recovery() {
+
+ /**
+ * The client is processing reconnection
+ */
+ if (cancelKeepAlive) return;
+
+ cancelKeepAlive();
+
+ try {
+ Set<String> ephemeralPaths = new HashSet<String>(registeredPaths);
+ if (!ephemeralPaths.isEmpty()) {
+ for (String path : ephemeralPaths) {
+ try {
+
+ /**
+ * The client is processing reconnection,
+ * cancel remaining service registration
+ */
+ if (cancelKeepAlive) return;
+
+ createEphemeral(path);
+ failedRegistered.remove(path);
+ } catch (Exception ignored) {
+ /**
+ * waiting for retry again
+ */
+ failedRegistered.add(path);
+ }
+ }
+ }
+ } catch (Throwable t) {
+ logger.warn("Unexpected error, failed to recover from global lease expired or deadline exceeded.", t);
+ }
+ }
+
+ public void delete(String path) {
+ try {
+ RetryLoops.invokeWithRetry(
+ new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ requiredNotNull(client, failed);
+ client.getKVClient()
+ .delete(ByteSequence.from(path, UTF_8))
+ .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS);
+ registeredPaths.remove(path);
+ return null;
+ }
+ }, retryPolicy);
+ } catch (Exception e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ } finally {
+ /**
+ * Cancel retry
+ */
+ failedRegistered.remove(path);
+ }
+ }
+
+ public String[] endPoints(String backupAddress) {
+ String[] endpoints = backupAddress.split(Constants.COMMA_SEPARATOR);
+ return Arrays.stream(endpoints)
+ .map(address -> address.indexOf(Constants.HTTP_SUBFIX_KEY) > -1
+ ? address
+ : Constants.HTTP_KEY + address)
+ .collect(toList())
+ .toArray(new String[0]);
+ }
+
+ /**
+ * because jetcd's connection change callback not supported yet, we must
+ * loop to test if connect or disconnect event happend or not. It will be changed
+ * in the future if we found better choice.
+ */
+ public void start() {
+ if (!started) {
+ try {
+ this.client = completableFuture.get(expirePeriod, TimeUnit.SECONDS);
+ this.connectState = isConnected();
+ this.started = true;
+ } catch (Throwable t) {
+ logger.error("Timeout! etcd3 server can not be connected in : " + expirePeriod + " seconds! url: " + url, t);
+
+ completableFuture.whenComplete((c, e) -> {
+ this.client = c;
+ if (e != null) {
+ logger.error("Got an exception when trying to create etcd3 instance, can not connect to etcd3 server, url: " + url, e);
+ }
+ });
+
+ }
+
+ try {
+ this.future = reconnectNotify.scheduleWithFixedDelay(new Runnable() {
+ @Override
+ public void run() {
+ boolean connected = isConnected();
+ if (connectState != connected) {
+ int notifyState = connected ? StateListener.CONNECTED : StateListener.DISCONNECTED;
+ if (connectionStateListener != null) {
+ if (connected) {
+ clearKeepAlive();
+ }
+ connectionStateListener.stateChanged(getClient(), notifyState);
+ cancelKeepAlive = false;
+ }
+ connectState = connected;
+ }
+ }
+
+ }, Constants.DEFAULT_REGISTRY_RECONNECT_PERIOD, Constants.DEFAULT_REGISTRY_RECONNECT_PERIOD, TimeUnit.MILLISECONDS);
+ } catch (Throwable t) {
+ logger.error("monitor reconnect status failed.", t);
+ }
+ }
+ }
+
+ private void cancelKeepAlive() {
+ try {
+ if (keepAlive != null) {
+ keepAlive.close();
+ }
+ } finally {
+ // help for gc
+ keepAlive = null;
+ }
+ }
+
+ private synchronized void clearKeepAlive() {
+ cancelKeepAlive = true;
+ registeredPaths.clear();
+ failedRegistered.clear();
+ cancelKeepAlive();
+ }
+
+ protected void doClose() {
+
+ try {
+ cancelKeepAlive = true;
+ revokeLease(this.globalLeaseId);
+ } catch (Exception e) {
+ logger.warn("revoke global lease '" + globalLeaseId + "' failed, registry: " + url, e);
+ }
+
+ try {
+ if (started && future != null) {
+ started = false;
+ future.cancel(true);
+ reconnectNotify.shutdownNow();
+ }
+ } catch (Exception e) {
+ logger.warn("stop reconnect Notify failed, registry: " + url, e);
+ }
+
+ try {
+ retryFuture.cancel(true);
+ retryExecutor.shutdownNow();
+ } catch (Throwable t) {
+ logger.warn(t.getMessage(), t);
+ }
+
+ if (getClient() != null) getClient().close();
+ }
+
+ /**
+ * try get client's shared channel, becase all fields is private on jetcd,
+ * we must using it by reflect, in the future, jetcd may provider better tools.
+ *
+ * @param client get channel from current client
+ * @return current connection channel
+ */
+ private ManagedChannel newChannel(Client client) {
+ try {
+ Field connectionField = client.getClass().getDeclaredField("connectionManager");
+ if (!connectionField.isAccessible()) {
+ connectionField.setAccessible(true);
+ }
+ Object connection = connectionField.get(client);
+ Method channel = connection.getClass().getDeclaredMethod("getChannel");
+ if (!channel.isAccessible()) {
+ channel.setAccessible(true);
+ }
+ return (ManagedChannel) channel.invoke(connection);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to obtain connection channel from " + url.getBackupAddress(), e);
+ }
+ }
+
+ public ConnectionStateListener getConnectionStateListener() {
+ return connectionStateListener;
+ }
+
+ public void setConnectionStateListener(ConnectionStateListener connectionStateListener) {
+ this.connectionStateListener = connectionStateListener;
+ }
+
+ public static void requiredNotNull(Object obj, RuntimeException exeception) {
+ if (obj == null) {
+ throw exeception;
+ }
+ }
+
+ private void retry() {
+ if (!failedRegistered.isEmpty()) {
+ Set<String> failed = new HashSet<String>(failedRegistered);
+ if (!failed.isEmpty()) {
+
+ if (cancelKeepAlive) return;
+
+ if (logger.isWarnEnabled()) {
+ logger.warn("Retry failed register(keep alive) for path '" + failed
+ + "', path size: " + failed.size());
+ }
+ try {
+ for (String path : failed) {
+ try {
+
+ /**
+ * Is it currently reconnecting ?
+ */
+ if (cancelKeepAlive) return;
+
+ createEphemeral(path);
+ failedRegistered.remove(path);
+ } catch (Throwable t) {
+ logger.warn("Failed to retry register(keep alive) for path '" + path + "', waiting for again, cause: " + t.getMessage(), t);
+ }
+ }
+ } catch (Throwable t) {
+ logger.warn("Failed to retry register(keep alive) for path '" + failed + "', waiting for again, cause: " + t.getMessage(), t);
+ }
+ }
+ }
+ }
+
+ public interface ConnectionStateListener {
+ /**
+ * Called when there is a state change in the connection
+ *
+ * @param client the client
+ * @param newState the new state
+ */
+ public void stateChanged(Client client, int newState);
+ }
+
+ /**
+ * default request timeout
+ */
+ public static final long DEFAULT_REQUEST_TIMEOUT = obtainRequestTimeout();
+
+ public static final int DEFAULT_INBOUT_SIZE = 100 * 1024 * 1024;
+
+ public static final String GRPC_MAX_INBOUD_SIZE_KEY = "grpc.max.inbound.size";
+
+ public static final String ETCD_REQUEST_TIMEOUT_KEY = "etcd.request.timeout";
+
+ private static int obtainRequestTimeout() {
+ if (StringUtils.isNotEmpty(System.getProperty(ETCD_REQUEST_TIMEOUT_KEY))) {
+ return Integer.valueOf(System.getProperty(ETCD_REQUEST_TIMEOUT_KEY));
+ }
+ /**
+ * 10 seconds.
+ */
+ return 10 * 1000;
+ }
+}
\ No newline at end of file
diff --git a/dubbo-common/src/test/java/org/apache/dubbo/common/config/ConfigurationUtilsTest.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdTransporter.java
similarity index 51%
copy from dubbo-common/src/test/java/org/apache/dubbo/common/config/ConfigurationUtilsTest.java
copy to dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdTransporter.java
index 89da34c..5ddec8e 100644
--- a/dubbo-common/src/test/java/org/apache/dubbo/common/config/ConfigurationUtilsTest.java
+++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdTransporter.java
@@ -14,29 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dubbo.common.config;
+package org.apache.dubbo.remoting.etcd.jetcd;
-import org.apache.dubbo.common.Constants;
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.remoting.etcd.EtcdClient;
+import org.apache.dubbo.remoting.etcd.EtcdTransporter;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
+public class JEtcdTransporter implements EtcdTransporter {
-/**
- *
- */
-public class ConfigurationUtilsTest {
-
- @Test
- public void testGetServerShutdownTimeout () {
- System.setProperty(Constants.SHUTDOWN_WAIT_KEY, " 10000");
- Assertions.assertEquals(10000, ConfigurationUtils.getServerShutdownTimeout());
- System.clearProperty(Constants.SHUTDOWN_WAIT_KEY);
+ @Override
+ public EtcdClient connect(URL url) {
+ return new JEtcdClient(url);
}
- @Test
- public void testGetProperty () {
- System.setProperty(Constants.SHUTDOWN_WAIT_KEY, " 10000");
- Assertions.assertEquals("10000", ConfigurationUtils.getProperty(Constants.SHUTDOWN_WAIT_KEY));
- System.clearProperty(Constants.SHUTDOWN_WAIT_KEY);
- }
}
diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/RetryLoops.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/RetryLoops.java
new file mode 100644
index 0000000..cf8617c
--- /dev/null
+++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/RetryLoops.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.etcd.jetcd;
+
+import io.grpc.Status;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.remoting.etcd.RetryPolicy;
+import org.apache.dubbo.remoting.etcd.option.OptionUtil;
+
+import java.util.concurrent.Callable;
+
+public class RetryLoops {
+
+ private final long startTimeMs = System.currentTimeMillis();
+ private boolean isDone = false;
+ private int retriedCount = 0;
+ private Logger logger = LoggerFactory.getLogger(RetryLoops.class);
+
+ public static <R> R invokeWithRetry(Callable<R> task, RetryPolicy retryPolicy) throws Exception {
+ R result = null;
+ RetryLoops retryLoop = new RetryLoops();
+ while (retryLoop.shouldContinue()) {
+ try {
+ result = task.call();
+ retryLoop.complete();
+ } catch (Exception e) {
+ retryLoop.fireException(e, retryPolicy);
+ }
+ }
+ return result;
+ }
+
+ public void fireException(Exception e, RetryPolicy retryPolicy) throws Exception {
+
+ if (e instanceof InterruptedException) Thread.currentThread().interrupt();
+
+ boolean rethrow = true;
+ if (isRetryException(e)
+ && retryPolicy.shouldRetry(retriedCount++, System.currentTimeMillis() - startTimeMs, true)) {
+ rethrow = false;
+ }
+
+ if (rethrow) {
+ throw e;
+ }
+ }
+
+ private boolean isRetryException(Throwable e) {
+ Status status = Status.fromThrowable(e);
+ if (OptionUtil.isRecoverable(status)) return true;
+
+ return false;
+ }
+
+ public boolean shouldContinue() {
+ return !isDone;
+ }
+
+ public void complete() {
+ isDone = true;
+ }
+
+}
diff --git a/dubbo-common/src/test/java/org/apache/dubbo/common/config/ConfigurationUtilsTest.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/RetryNTimes.java
similarity index 52%
copy from dubbo-common/src/test/java/org/apache/dubbo/common/config/ConfigurationUtilsTest.java
copy to dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/RetryNTimes.java
index 89da34c..7453208 100644
--- a/dubbo-common/src/test/java/org/apache/dubbo/common/config/ConfigurationUtilsTest.java
+++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/RetryNTimes.java
@@ -14,29 +14,23 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dubbo.common.config;
+package org.apache.dubbo.remoting.etcd.jetcd;
-import org.apache.dubbo.common.Constants;
+import org.apache.dubbo.remoting.etcd.AbstractRetryPolicy;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
+import java.util.concurrent.TimeUnit;
-/**
- *
- */
-public class ConfigurationUtilsTest {
+public class RetryNTimes extends AbstractRetryPolicy {
+
+ private final long sleepMilliseconds;
- @Test
- public void testGetServerShutdownTimeout () {
- System.setProperty(Constants.SHUTDOWN_WAIT_KEY, " 10000");
- Assertions.assertEquals(10000, ConfigurationUtils.getServerShutdownTimeout());
- System.clearProperty(Constants.SHUTDOWN_WAIT_KEY);
+ public RetryNTimes(int maxRetried, int sleepTime, TimeUnit unit) {
+ super(maxRetried);
+ this.sleepMilliseconds = unit.convert(sleepTime, TimeUnit.MILLISECONDS);
}
- @Test
- public void testGetProperty () {
- System.setProperty(Constants.SHUTDOWN_WAIT_KEY, " 10000");
- Assertions.assertEquals("10000", ConfigurationUtils.getProperty(Constants.SHUTDOWN_WAIT_KEY));
- System.clearProperty(Constants.SHUTDOWN_WAIT_KEY);
+ @Override
+ protected long getSleepTime(int retried, long elapsed) {
+ return sleepMilliseconds;
}
}
diff --git a/dubbo-common/src/test/java/org/apache/dubbo/common/config/ConfigurationUtilsTest.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/option/Constants.java
similarity index 51%
copy from dubbo-common/src/test/java/org/apache/dubbo/common/config/ConfigurationUtilsTest.java
copy to dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/option/Constants.java
index 89da34c..c935808 100644
--- a/dubbo-common/src/test/java/org/apache/dubbo/common/config/ConfigurationUtilsTest.java
+++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/option/Constants.java
@@ -14,29 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dubbo.common.config;
-
-import org.apache.dubbo.common.Constants;
-
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
+package org.apache.dubbo.remoting.etcd.option;
/**
- *
+ * Etcd registry constants.
*/
-public class ConfigurationUtilsTest {
+public class Constants extends org.apache.dubbo.common.Constants {
+
+ public static final String HTTP_SUBFIX_KEY = "://";
+
+ public static final String HTTP_KEY = "http://";
- @Test
- public void testGetServerShutdownTimeout () {
- System.setProperty(Constants.SHUTDOWN_WAIT_KEY, " 10000");
- Assertions.assertEquals(10000, ConfigurationUtils.getServerShutdownTimeout());
- System.clearProperty(Constants.SHUTDOWN_WAIT_KEY);
- }
+ public static final int DEFAULT_KEEPALIVE_TIMEOUT = DEFAULT_SESSION_TIMEOUT / 2;
- @Test
- public void testGetProperty () {
- System.setProperty(Constants.SHUTDOWN_WAIT_KEY, " 10000");
- Assertions.assertEquals("10000", ConfigurationUtils.getProperty(Constants.SHUTDOWN_WAIT_KEY));
- System.clearProperty(Constants.SHUTDOWN_WAIT_KEY);
- }
}
diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/option/OptionUtil.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/option/OptionUtil.java
new file mode 100644
index 0000000..609f289
--- /dev/null
+++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/option/OptionUtil.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.etcd.option;
+
+import io.etcd.jetcd.ByteSequence;
+import io.grpc.Status;
+import io.netty.handler.codec.http2.Http2Exception;
+
+import java.util.Arrays;
+
+public class OptionUtil {
+
+ public static final byte[] NO_PREFIX_END = {0};
+
+ public static final ByteSequence prefixEndOf(ByteSequence prefix) {
+ byte[] endKey = prefix.getBytes().clone();
+ for (int i = endKey.length - 1; i >= 0; i--) {
+ if (endKey[i] < 0xff) {
+ endKey[i] = (byte) (endKey[i] + 1);
+ return ByteSequence.from(Arrays.copyOf(endKey, i + 1));
+ }
+ }
+
+ return ByteSequence.from(NO_PREFIX_END);
+ }
+
+ public static boolean isRecoverable(Status status) {
+ return isHaltError(status)
+ || isNoLeaderError(status)
+ // ephemeral is expired
+ || status.getCode() == Status.Code.NOT_FOUND;
+ }
+
+ public static boolean isHaltError(Status status) {
+ // Unavailable codes mean the system will be right back.
+ // (e.g., can't connect, lost leader)
+ // Treat Internal codes as if something failed, leaving the
+ // system in an inconsistent state, but retrying could make progress.
+ // (e.g., failed in middle of send, corrupted frame)
+ return status.getCode() != Status.Code.UNAVAILABLE && status.getCode() != Status.Code.INTERNAL;
+ }
+
+ public static boolean isNoLeaderError(Status status) {
+ return status.getCode() == Status.Code.UNAVAILABLE
+ && "etcdserver: no leader".equals(status.getDescription());
+ }
+
+ public static boolean isProtocolError(Throwable e) {
+ if (e == null) return false;
+ Throwable cause = e.getCause();
+ while (cause != null) {
+ if (cause instanceof Http2Exception) {
+ Http2Exception t = (Http2Exception) cause;
+ if ("PROTOCOL_ERROR".equals(t.error().name())) {
+ return true;
+ }
+ }
+ cause = cause.getCause();
+ }
+ return false;
+ }
+}
diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/support/AbstractEtcdClient.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/support/AbstractEtcdClient.java
new file mode 100644
index 0000000..31752bf
--- /dev/null
+++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/support/AbstractEtcdClient.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.etcd.support;
+
+import org.apache.dubbo.common.Constants;
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.ConcurrentHashSet;
+import org.apache.dubbo.common.utils.StringUtils;
+import org.apache.dubbo.remoting.etcd.ChildListener;
+import org.apache.dubbo.remoting.etcd.EtcdClient;
+import org.apache.dubbo.remoting.etcd.StateListener;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+public abstract class AbstractEtcdClient<WatcherListener> implements EtcdClient {
+
+ protected static final Logger logger = LoggerFactory.getLogger(AbstractEtcdClient.class);
+
+ private final URL url;
+
+ private final Set<StateListener> stateListeners = new ConcurrentHashSet<>();
+
+ private final ConcurrentMap<String, ConcurrentMap<ChildListener, WatcherListener>> childListeners = new ConcurrentHashMap<String, ConcurrentMap<ChildListener, WatcherListener>>();
+ private final List<String> categroies = Arrays.asList(Constants.PROVIDERS_CATEGORY
+ , Constants.CONSUMERS_CATEGORY
+ , Constants.ROUTERS_CATEGORY
+ , Constants.CONFIGURATORS_CATEGORY);
+ private volatile boolean closed = false;
+
+ public AbstractEtcdClient(URL url) {
+ this.url = url;
+ }
+
+ public URL getUrl() {
+ return url;
+ }
+
+ public void create(String path) {
+ String fixedPath = fixNamespace(path);
+ createParentIfAbsent(fixedPath);
+ doCreatePersistent(fixedPath);
+ }
+
+ public long createEphemeral(String path) {
+ String fixedPath = fixNamespace(path);
+ createParentIfAbsent(fixedPath);
+ return doCreateEphemeral(path);
+ }
+
+ public void addStateListener(StateListener listener) {
+ stateListeners.add(listener);
+ }
+
+ public void removeStateListener(StateListener listener) {
+ stateListeners.remove(listener);
+ }
+
+ public Set<StateListener> getSessionListeners() {
+ return stateListeners;
+ }
+
+ public List<String> addChildListener(String path, final ChildListener listener) {
+ ConcurrentMap<ChildListener, WatcherListener> listeners = childListeners.get(path);
+ if (listeners == null) {
+ childListeners.putIfAbsent(path, new ConcurrentHashMap<ChildListener, WatcherListener>());
+ listeners = childListeners.get(path);
+ }
+ WatcherListener targetListener = listeners.get(listener);
+ if (targetListener == null) {
+ listeners.putIfAbsent(listener, createChildWatcherListener(path, listener));
+ targetListener = listeners.get(listener);
+ }
+ return addChildWatcherListener(path, targetListener);
+ }
+
+ public WatcherListener getChildListener(String path, ChildListener listener) {
+ ConcurrentMap<ChildListener, WatcherListener> listeners = childListeners.get(path);
+ if (listeners == null) {
+ return null;
+ }
+ WatcherListener targetListener = listeners.get(listener);
+ if (targetListener == null) {
+ listeners.putIfAbsent(listener, createChildWatcherListener(path, listener));
+ targetListener = listeners.get(listener);
+ }
+ return targetListener;
+ }
+
+ public void removeChildListener(String path, ChildListener listener) {
+ ConcurrentMap<ChildListener, WatcherListener> listeners = childListeners.get(path);
+ if (listeners != null) {
+ WatcherListener targetListener = listeners.remove(listener);
+ if (targetListener != null) {
+ removeChildWatcherListener(path, targetListener);
+ }
+ }
+ }
+
+ protected void stateChanged(int state) {
+ for (StateListener sessionListener : getSessionListeners()) {
+ sessionListener.stateChanged(state);
+ }
+ }
+
+ protected String fixNamespace(String path) {
+ if (StringUtils.isEmpty(path)) {
+ throw new IllegalArgumentException("path is required, actual null or ''");
+ }
+ return (path.charAt(0) != '/') ? (Constants.PATH_SEPARATOR + path) : path;
+ }
+
+ protected void createParentIfAbsent(String fixedPath) {
+ int i = fixedPath.lastIndexOf('/');
+ if (i > 0) {
+ String parentPath = fixedPath.substring(0, i);
+ if (categroies.stream().anyMatch(c -> fixedPath.endsWith(c))) {
+ if (!checkExists(parentPath)) {
+ this.doCreatePersistent(parentPath);
+ }
+ } else if (categroies.stream().anyMatch(c -> parentPath.endsWith(c))) {
+ String grandfather = parentPath.substring(0, parentPath.lastIndexOf('/'));
+ if (!checkExists(grandfather)) {
+ this.doCreatePersistent(grandfather);
+ }
+ }
+ }
+ }
+
+ public void close() {
+ if (closed) {
+ return;
+ }
+ closed = true;
+ try {
+ doClose();
+ } catch (Throwable t) {
+ logger.warn(t.getMessage(), t);
+ }
+ }
+
+ public abstract void doClose();
+
+ public abstract void doCreatePersistent(String path);
+
+ public abstract long doCreateEphemeral(String path);
+
+ public abstract void delete(String path);
+
+ public abstract boolean checkExists(String path);
+
+ public abstract WatcherListener createChildWatcherListener(String path, ChildListener listener);
+
+ public abstract List<String> addChildWatcherListener(String path, WatcherListener listener);
+
+ public abstract void removeChildWatcherListener(String path, WatcherListener listener);
+
+}
diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.etcd.EtcdTransporter b/dubbo-remoting/dubbo-remoting-etcd3/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.etcd.EtcdTransporter
new file mode 100644
index 0000000..d10733a
--- /dev/null
+++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.etcd.EtcdTransporter
@@ -0,0 +1 @@
+jetcd=org.apache.dubbo.remoting.etcd.jetcd.JEtcdTransporter
\ No newline at end of file
diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/test/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientTest.java b/dubbo-remoting/dubbo-remoting-etcd3/src/test/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientTest.java
new file mode 100644
index 0000000..19254ab
--- /dev/null
+++ b/dubbo-remoting/dubbo-remoting-etcd3/src/test/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientTest.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.etcd.jetcd;
+
+import io.etcd.jetcd.common.exception.ClosedClientException;
+import io.grpc.Status;
+import org.apache.dubbo.common.Constants;
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.remoting.etcd.ChildListener;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+@Disabled
+public class JEtcdClientTest {
+
+ JEtcdClient client;
+
+ @Test
+ public void test_watch_when_create_path() throws InterruptedException {
+
+ String path = "/dubbo/com.alibaba.dubbo.demo.DemoService/providers";
+ String child = "/dubbo/com.alibaba.dubbo.demo.DemoService/providers/demoService1";
+
+ final CountDownLatch notNotified = new CountDownLatch(1);
+
+ ChildListener childListener = (parent, children) -> {
+ Assertions.assertEquals(1, children.size());
+ Assertions.assertEquals(child.substring(child.lastIndexOf("/") + 1), children.get(0));
+ notNotified.countDown();
+ };
+
+ client.addChildListener(path, childListener);
+
+ client.createEphemeral(child);
+ Assertions.assertTrue(notNotified.await(10, TimeUnit.SECONDS));
+
+ client.removeChildListener(path, childListener);
+ client.delete(child);
+ }
+
+ @Test
+ public void test_watch_when_create_wrong_path() throws InterruptedException {
+
+ String path = "/dubbo/com.alibaba.dubbo.demo.DemoService/providers";
+ String child = "/dubbo/com.alibaba.dubbo.demo.DemoService/routers/demoService1";
+
+ final CountDownLatch notNotified = new CountDownLatch(1);
+
+ ChildListener childListener = (parent, children) -> {
+ Assertions.assertEquals(1, children.size());
+ Assertions.assertEquals(child, children.get(0));
+ notNotified.countDown();
+ };
+
+ client.addChildListener(path, childListener);
+
+ client.createEphemeral(child);
+ Assertions.assertFalse(notNotified.await(1, TimeUnit.SECONDS));
+
+ client.removeChildListener(path, childListener);
+ client.delete(child);
+ }
+
+ @Test
+ public void test_watch_when_delete_path() throws InterruptedException {
+
+ String path = "/dubbo/com.alibaba.dubbo.demo.DemoService/providers";
+ String child = "/dubbo/com.alibaba.dubbo.demo.DemoService/providers/demoService1";
+
+ final CountDownLatch notNotified = new CountDownLatch(1);
+
+ ChildListener childListener = (parent, children) -> {
+ Assertions.assertEquals(0, children.size());
+ notNotified.countDown();
+ };
+
+ client.createEphemeral(child);
+
+ client.addChildListener(path, childListener);
+ client.delete(child);
+
+ Assertions.assertTrue(notNotified.await(10, TimeUnit.SECONDS));
+ client.removeChildListener(path, childListener);
+ }
+
+ @Test
+ public void test_watch_then_unwatch() throws InterruptedException {
+
+ String path = "/dubbo/com.alibaba.dubbo.demo.DemoService/providers";
+ String child = "/dubbo/com.alibaba.dubbo.demo.DemoService/providers/demoService2";
+
+ final CountDownLatch notNotified = new CountDownLatch(1);
+ final CountDownLatch notTwiceNotified = new CountDownLatch(2);
+
+ final Holder notified = new Holder();
+
+ ChildListener childListener = (parent, children) -> {
+ Assertions.assertEquals(1, children.size());
+ Assertions.assertEquals(child.substring(child.lastIndexOf("/") + 1), children.get(0));
+ notNotified.countDown();
+ notTwiceNotified.countDown();
+ notified.getAndIncrease();
+ };
+
+ client.addChildListener(path, childListener);
+
+ client.createEphemeral(child);
+ Assertions.assertTrue(notNotified.await(15, TimeUnit.SECONDS));
+
+ client.removeChildListener(path, childListener);
+ client.delete(child);
+
+ Assertions.assertFalse(notTwiceNotified.await(5, TimeUnit.SECONDS));
+ Assertions.assertEquals(1, notified.value);
+ client.delete(child);
+ }
+
+ @Test
+ public void test_watch_on_unrecoverable_connection() throws InterruptedException {
+
+ String path = "/dubbo/com.alibaba.dubbo.demo.DemoService/providers";
+ JEtcdClient.EtcdWatcher watcher = null;
+ try {
+ ChildListener childListener = (parent, children) -> {
+ Assertions.assertEquals(path, parent);
+ };
+ client.addChildListener(path, childListener);
+ watcher = client.getChildListener(path, childListener);
+ watcher.watchRequest.onError(Status.ABORTED.withDescription("connection error").asRuntimeException());
+
+ watcher.watchRequest.onNext(watcher.nextRequest());
+ } catch (Exception e) {
+ Assertions.assertTrue(e.getMessage().contains("call was cancelled"));
+ }
+ }
+
+ @Test
+ public void test_watch_on_recoverable_connection() throws InterruptedException {
+
+ String path = "/dubbo/com.alibaba.dubbo.demo.DemoService/connection";
+ String child = "/dubbo/com.alibaba.dubbo.demo.DemoService/connection/demoService1";
+
+ final CountDownLatch notNotified = new CountDownLatch(1);
+ final CountDownLatch notTwiceNotified = new CountDownLatch(2);
+ final Holder notified = new Holder();
+ ChildListener childListener = (parent, children) -> {
+ notTwiceNotified.countDown();
+ switch (notified.increaseAndGet()) {
+ case 1: {
+ notNotified.countDown();
+ Assertions.assertTrue(children.size() == 1);
+ Assertions.assertEquals(child.substring(child.lastIndexOf("/") + 1), children.get(0));
+ break;
+ }
+ case 2: {
+ Assertions.assertTrue(children.size() == 0);
+ Assertions.assertEquals(path, parent);
+ break;
+ }
+ default:
+ Assertions.fail("two many callback invoked.");
+ }
+ };
+
+ client.addChildListener(path, childListener);
+ client.createEphemeral(child);
+
+ // make sure first time callback successfully
+ Assertions.assertTrue(notNotified.await(15, TimeUnit.SECONDS));
+
+ // connection error causes client to release all resources including current watcher
+ JEtcdClient.EtcdWatcher watcher = client.getChildListener(path, childListener);
+ watcher.onError(Status.UNAVAILABLE.withDescription("temporary connection issue").asRuntimeException());
+
+ // trigger delete after unavailable
+ client.delete(child);
+ Assertions.assertTrue(notTwiceNotified.await(15, TimeUnit.SECONDS));
+
+ client.removeChildListener(path, childListener);
+ }
+
+ @Test
+ public void test_watch_after_client_closed() throws InterruptedException {
+
+ String path = "/dubbo/com.alibaba.dubbo.demo.DemoService/providers";
+ client.close();
+
+ try {
+ client.addChildListener(path, (parent, children) -> {
+ Assertions.assertEquals(path, parent);
+ });
+ } catch (ClosedClientException e) {
+ Assertions.assertEquals("watch client has been closed, path '" + path + "'", e.getMessage());
+ }
+ }
+
+ @BeforeEach
+ public void setUp() {
+ // timeout in 15 seconds.
+ URL url = URL.valueOf("etcd3://127.0.0.1:2379/com.alibaba.dubbo.registry.RegistryService")
+ .addParameter(Constants.SESSION_TIMEOUT_KEY, 15000);
+
+ client = new JEtcdClient(url);
+ }
+
+ @AfterEach
+ public void tearDown() {
+ client.close();
+ }
+
+ static class Holder {
+
+ volatile int value;
+
+ synchronized int getAndIncrease() {
+ return value++;
+ }
+
+ synchronized int increaseAndGet() {
+ return ++value;
+ }
+ }
+}
diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/test/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapperTest.java b/dubbo-remoting/dubbo-remoting-etcd3/src/test/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapperTest.java
new file mode 100644
index 0000000..b7d2671
--- /dev/null
+++ b/dubbo-remoting/dubbo-remoting-etcd3/src/test/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapperTest.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.etcd.jetcd;
+
+import org.apache.dubbo.common.Constants;
+import org.apache.dubbo.common.URL;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.LockSupport;
+
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+
+@Disabled
+public class JEtcdClientWrapperTest {
+
+ JEtcdClientWrapper clientWrapper;
+
+ @Test
+ public void test_path_exists() {
+ String path = "/dubbo/org.apache.dubbo.demo.DemoService/providers";
+ clientWrapper.createPersistent(path);
+ Assertions.assertTrue(clientWrapper.checkExists(path));
+ Assertions.assertFalse(clientWrapper.checkExists(path + "/noneexits"));
+ clientWrapper.delete(path);
+ }
+
+ @Test
+ public void test_create_emerphal_path() {
+ String path = "/dubbo/org.apache.dubbo.demo.DemoService/providers";
+ clientWrapper.createEphemeral(path);
+ Assertions.assertTrue(clientWrapper.checkExists(path));
+ clientWrapper.delete(path);
+ }
+
+ @Test
+ public void test_grant_lease_then_revoke() {
+ long lease = clientWrapper.createLease(1);
+ clientWrapper.revokeLease(lease);
+
+ long newLease = clientWrapper.createLease(1);
+ LockSupport.parkNanos(this, TimeUnit.SECONDS.toNanos(2));
+ // test timeout of lease
+ clientWrapper.revokeLease(newLease);
+ }
+
+ @Test
+ public void test_create_emerphal_path_then_timeout() {
+ String path = "/dubbo/org.apache.dubbo.demo.DemoService/providers";
+
+ URL url = URL.valueOf("etcd3://127.0.0.1:2379/org.apache.dubbo.registry.RegistryService")
+ .addParameter(Constants.SESSION_TIMEOUT_KEY, 1000);
+
+ JEtcdClientWrapper saved = clientWrapper;
+
+ try {
+ clientWrapper = spy(new JEtcdClientWrapper(url));
+ clientWrapper.start();
+
+ doAnswer(new Answer() {
+ int timeout;
+
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ LockSupport.parkNanos(this, TimeUnit.SECONDS.toNanos(2));
+ if (timeout++ > 0) {
+ throw new TimeoutException();
+ }
+ return null;
+ }
+ }).when(clientWrapper).keepAlive(anyLong());
+
+ try {
+ clientWrapper.createEphemeral(path);
+ } catch (IllegalStateException ex) {
+ Assertions.assertEquals("failed to create ephereral by path '" + path + "'", ex.getMessage());
+ }
+
+ } finally {
+ clientWrapper.doClose();
+ clientWrapper = saved;
+ }
+ }
+
+ @Test
+ public void test_get_emerphal_children_path() {
+ String path = "/dubbo/org.apache.dubbo.demo.DemoService/providers";
+ String[] children = {
+ "/dubbo/org.apache.dubbo.demo.DemoService/providers/service1"
+ , "/dubbo/org.apache.dubbo.demo.DemoService/providers/service2"
+ , "/dubbo/org.apache.dubbo.demo.DemoService/providers/service3"
+ , "/dubbo/org.apache.dubbo.demo.DemoService/providers/service4"
+ , "/dubbo/org.apache.dubbo.demo.DemoService/providers/service5/exclude"
+ };
+
+ Arrays.stream(children).forEach((child) -> {
+ Assertions.assertFalse(clientWrapper.checkExists(child));
+ clientWrapper.createEphemeral(child);
+ });
+
+ List<String> extected = clientWrapper.getChildren(path);
+
+ Assertions.assertEquals(4, extected.size());
+ extected.stream().forEach((child) -> {
+ boolean found = false;
+ for (int i = 0; i < children.length; ++i) {
+ if (child.equals(children[i])) {
+ found = true;
+ break;
+ }
+ }
+ Assertions.assertTrue(found);
+ clientWrapper.delete(child);
+ });
+ }
+
+ @Test
+ public void test_connect_cluster() {
+ URL url = URL.valueOf("etcd3://127.0.0.1:22379/org.apache.dubbo.registry.RegistryService?backup=127.0.0.1:2379,127.0.0.1:32379");
+ JEtcdClientWrapper clientWrapper = new JEtcdClientWrapper(url);
+ try {
+ clientWrapper.start();
+ String path = "/dubbo/org.apache.dubbo.demo.DemoService/providers";
+ clientWrapper.createEphemeral(path);
+ Assertions.assertTrue(clientWrapper.checkExists(path));
+ Assertions.assertFalse(clientWrapper.checkExists(path + "/noneexits"));
+ clientWrapper.delete(path);
+ } finally {
+ clientWrapper.doClose();
+ }
+ }
+
+ @BeforeEach
+ public void setUp() {
+ URL url = URL.valueOf("etcd3://127.0.0.1:2379/org.apache.dubbo.registry.RegistryService");
+ clientWrapper = new JEtcdClientWrapper(url);
+ clientWrapper.start();
+ }
+
+ @AfterEach
+ public void tearDown() {
+ clientWrapper.doClose();
+ }
+}
diff --git a/dubbo-remoting/pom.xml b/dubbo-remoting/pom.xml
index d646c22..fd17dbf 100644
--- a/dubbo-remoting/pom.xml
+++ b/dubbo-remoting/pom.xml
@@ -38,5 +38,6 @@
<module>dubbo-remoting-http</module>
<module>dubbo-remoting-zookeeper</module>
<module>dubbo-remoting-netty4</module>
+ <module>dubbo-remoting-etcd3</module>
</modules>
</project>
\ No newline at end of file