You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by yi...@apache.org on 2019/03/11 03:56:26 UTC

[incubator-dubbo] branch master updated: [Dubbo-808] Support etcd registry (#3605)

This is an automated email from the ASF dual-hosted git repository.

yiji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-dubbo.git


The following commit(s) were added to refs/heads/master by this push:
     new 1ee4d84  [Dubbo-808] Support etcd registry (#3605)
1ee4d84 is described below

commit 1ee4d847252a9aa3458409a25ac72b49b4a36fd1
Author: Huxing Zhang <hu...@gmail.com>
AuthorDate: Mon Mar 11 11:55:58 2019 +0800

    [Dubbo-808] Support etcd registry (#3605)
    
    * Merge https://github.com/dubbo/dubbo-registry-etcd into incubator-dubbo
    
    * Add UT to ConfigurationUtilsTest
---
 dubbo-bom/pom.xml                                  |  10 +
 .../common/config/ConfigurationUtilsTest.java      |  27 +
 dubbo-dependencies-bom/pom.xml                     |  16 +
 dubbo-registry/dubbo-registry-etcd3/pom.xml        |  53 ++
 .../apache/dubbo/registry/etcd/EtcdRegistry.java   | 373 +++++++++++
 .../dubbo/registry/etcd/EtcdRegistryFactory.java   |  53 ++
 .../org.apache.dubbo.registry.RegistryFactory      |   1 +
 .../dubbo/registry/etcd/EtcdRegistryTest.java      | 316 +++++++++
 dubbo-registry/pom.xml                             |   1 +
 dubbo-remoting/dubbo-remoting-etcd3/pom.xml        |  52 ++
 .../dubbo/remoting/etcd/AbstractRetryPolicy.java   |  40 +-
 .../apache/dubbo/remoting/etcd/ChildListener.java  |  25 +-
 .../org/apache/dubbo/remoting/etcd/EtcdClient.java | 167 +++++
 .../dubbo/remoting/etcd/EtcdTransporter.java       |  47 ++
 .../apache/dubbo/remoting/etcd/RetryPolicy.java    |  33 +-
 .../apache/dubbo/remoting/etcd/StateListener.java  |  25 +-
 .../dubbo/remoting/etcd/jetcd/JEtcdClient.java     | 400 ++++++++++++
 .../remoting/etcd/jetcd/JEtcdClientWrapper.java    | 706 +++++++++++++++++++++
 .../remoting/etcd/jetcd/JEtcdTransporter.java      |  28 +-
 .../dubbo/remoting/etcd/jetcd/RetryLoops.java      |  95 +++
 .../dubbo/remoting/etcd/jetcd/RetryNTimes.java     |  30 +-
 .../dubbo/remoting/etcd/option/Constants.java      |  28 +-
 .../dubbo/remoting/etcd/option/OptionUtil.java     |  76 +++
 .../remoting/etcd/support/AbstractEtcdClient.java  | 194 ++++++
 .../org.apache.dubbo.remoting.etcd.EtcdTransporter |   1 +
 .../dubbo/remoting/etcd/jetcd/JEtcdClientTest.java | 260 ++++++++
 .../etcd/jetcd/JEtcdClientWrapperTest.java         | 186 ++++++
 dubbo-remoting/pom.xml                             |   1 +
 28 files changed, 3104 insertions(+), 140 deletions(-)

diff --git a/dubbo-bom/pom.xml b/dubbo-bom/pom.xml
index 73e73df..d04495a 100644
--- a/dubbo-bom/pom.xml
+++ b/dubbo-bom/pom.xml
@@ -140,6 +140,11 @@
             </dependency>
             <dependency>
                 <groupId>org.apache.dubbo</groupId>
+                <artifactId>dubbo-remoting-etcd3</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.dubbo</groupId>
                 <artifactId>dubbo-rpc-api</artifactId>
                 <version>${project.version}</version>
             </dependency>
@@ -220,6 +225,11 @@
             </dependency>
             <dependency>
                 <groupId>org.apache.dubbo</groupId>
+                <artifactId>dubbo-registry-etcd3</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.dubbo</groupId>
                 <artifactId>dubbo-registry-consul</artifactId>
                 <version>${project.version}</version>
             </dependency>
diff --git a/dubbo-common/src/test/java/org/apache/dubbo/common/config/ConfigurationUtilsTest.java b/dubbo-common/src/test/java/org/apache/dubbo/common/config/ConfigurationUtilsTest.java
index 89da34c..a16a374 100644
--- a/dubbo-common/src/test/java/org/apache/dubbo/common/config/ConfigurationUtilsTest.java
+++ b/dubbo-common/src/test/java/org/apache/dubbo/common/config/ConfigurationUtilsTest.java
@@ -21,6 +21,8 @@ import org.apache.dubbo.common.Constants;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
+import java.util.Map;
+
 /**
  *
  */
@@ -39,4 +41,29 @@ public class ConfigurationUtilsTest {
         Assertions.assertEquals("10000", ConfigurationUtils.getProperty(Constants.SHUTDOWN_WAIT_KEY));
         System.clearProperty(Constants.SHUTDOWN_WAIT_KEY);
     }
+
+    @Test
+    public void testParseSingleProperties() throws Exception {
+        String p1 = "aaa=bbb";
+        Map<String, String> result = ConfigurationUtils.parseProperties(p1);
+        Assertions.assertEquals(1, result.size());
+        Assertions.assertEquals("bbb", result.get("aaa"));
+    }
+
+    @Test
+    public void testParseMultipleProperties() throws Exception {
+        String p1 = "aaa=bbb\nccc=ddd";
+        Map<String, String> result = ConfigurationUtils.parseProperties(p1);
+        Assertions.assertEquals(2, result.size());
+        Assertions.assertEquals("bbb", result.get("aaa"));
+        Assertions.assertEquals("ddd", result.get("ccc"));
+    }
+
+    @Test
+    public void testEscapedNewLine() throws Exception {
+        String p1 = "dubbo.registry.address=zookeeper://127.0.0.1:2181\\\\ndubbo.protocol.port=20880";
+        Map<String, String> result = ConfigurationUtils.parseProperties(p1);
+        Assertions.assertEquals(1, result.size());
+        Assertions.assertEquals("zookeeper://127.0.0.1:2181\\ndubbo.protocol.port=20880", result.get("dubbo.registry.address"));
+    }
 }
diff --git a/dubbo-dependencies-bom/pom.xml b/dubbo-dependencies-bom/pom.xml
index c9fa963..baa2eed 100644
--- a/dubbo-dependencies-bom/pom.xml
+++ b/dubbo-dependencies-bom/pom.xml
@@ -127,6 +127,7 @@
         <rs_api_version>2.0</rs_api_version>
         <resteasy_version>3.0.19.Final</resteasy_version>
         <tomcat_embed_version>8.5.31</tomcat_embed_version>
+        <jetcd_version>0.3.0</jetcd_version>
         <!-- Log libs -->
         <slf4j_version>1.7.25</slf4j_version>
         <jcl_version>1.2</jcl_version>
@@ -379,6 +380,21 @@
                 <artifactId>tomcat-embed-logging-juli</artifactId>
                 <version>${tomcat_embed_version}</version>
             </dependency>
+            <dependency>
+                <groupId>io.etcd</groupId>
+                <artifactId>jetcd-core</artifactId>
+                <version>${jetcd_version}</version>
+                <exclusions>
+                    <exclusion>
+                        <groupId>io.netty</groupId>
+                        <artifactId>netty-codec-http2</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>io.netty</groupId>
+                        <artifactId>netty-handler-proxy</artifactId>
+                    </exclusion>
+                </exclusions>
+            </dependency>
             <!-- Log libs -->
             <dependency>
                 <groupId>org.slf4j</groupId>
diff --git a/dubbo-registry/dubbo-registry-etcd3/pom.xml b/dubbo-registry/dubbo-registry-etcd3/pom.xml
new file mode 100644
index 0000000..00b0ae5
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-etcd3/pom.xml
@@ -0,0 +1,53 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <artifactId>dubbo-registry</artifactId>
+        <groupId>org.apache.dubbo</groupId>
+        <version>2.7.1-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>dubbo-registry-etcd3</artifactId>
+    <packaging>jar</packaging>
+    <name>${project.artifactId}</name>
+    <description>The etcd3 registry module of Dubbo project</description>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.dubbo</groupId>
+            <artifactId>dubbo-registry-api</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.dubbo</groupId>
+            <artifactId>dubbo-common</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.dubbo</groupId>
+            <artifactId>dubbo-remoting-etcd3</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+    </dependencies>
+
+
+</project>
\ No newline at end of file
diff --git a/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdRegistry.java b/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdRegistry.java
new file mode 100644
index 0000000..504d521
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdRegistry.java
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.etcd;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.ConcurrentHashSet;
+import org.apache.dubbo.common.utils.UrlUtils;
+import org.apache.dubbo.registry.NotifyListener;
+import org.apache.dubbo.registry.support.FailbackRegistry;
+import org.apache.dubbo.remoting.etcd.ChildListener;
+import org.apache.dubbo.remoting.etcd.EtcdClient;
+import org.apache.dubbo.remoting.etcd.EtcdTransporter;
+import org.apache.dubbo.remoting.etcd.StateListener;
+import org.apache.dubbo.remoting.etcd.option.Constants;
+import org.apache.dubbo.remoting.etcd.option.OptionUtil;
+import org.apache.dubbo.rpc.RpcException;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+
+/**
+ * Support for ectd3 registry.
+ */
+public class EtcdRegistry extends FailbackRegistry {
+
+    private final static Logger logger = LoggerFactory.getLogger(EtcdRegistry.class);
+
+    private final static int DEFAULT_ETCD_PORT = 2379;
+
+    private final static String DEFAULT_ROOT = "dubbo";
+
+    private final String root;
+
+    private final Set<String> anyServices = new ConcurrentHashSet<String>();
+
+    private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> etcdListeners = new ConcurrentHashMap<URL, ConcurrentMap<NotifyListener, ChildListener>>();
+    private final EtcdClient etcdClient;
+    private long expirePeriod;
+
+    public EtcdRegistry(URL url, EtcdTransporter etcdTransporter) {
+        super(url);
+        if (url.isAnyHost()) {
+            throw new IllegalStateException("registry address is invalid, actual: '" + url.getHost() + "'");
+        }
+        String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
+        if (!group.startsWith(Constants.PATH_SEPARATOR)) {
+            group = Constants.PATH_SEPARATOR + group;
+        }
+        this.root = group;
+        etcdClient = etcdTransporter.connect(url);
+        etcdClient.addStateListener(new StateListener() {
+            public void stateChanged(int state) {
+                if (state == CONNECTED) {
+                    try {
+                        recover();
+                    } catch (Exception e) {
+                        logger.error(e.getMessage(), e);
+                    }
+                }
+            }
+        });
+    }
+
+    protected static String appendDefaultPort(String address) {
+        if (address != null && address.length() > 0) {
+            int i = address.indexOf(':');
+            if (i < 0) {
+                return address + ":" + DEFAULT_ETCD_PORT;
+            } else if (Integer.parseInt(address.substring(i + 1)) == 0) {
+                return address.substring(0, i + 1) + DEFAULT_ETCD_PORT;
+            }
+        }
+        return address;
+    }
+
+    @Override
+    public void doRegister(URL url) {
+        try {
+            String path = toUrlPath(url);
+            if (url.getParameter(Constants.DYNAMIC_KEY, true)) {
+                etcdClient.createEphemeral(path);
+                return;
+            }
+            etcdClient.create(path);
+        } catch (Throwable e) {
+            throw new RpcException("Failed to register " + url + " to etcd " + getUrl()
+                    + ", cause: " + (OptionUtil.isProtocolError(e)
+                    ? "etcd3 registy maybe not supported yet or etcd3 registry not available."
+                    : e.getMessage()), e);
+        }
+    }
+
+    @Override
+    public void doUnregister(URL url) {
+        try {
+            String path = toUrlPath(url);
+            etcdClient.delete(path);
+        } catch (Throwable e) {
+            throw new RpcException("Failed to unregister " + url + " to etcd " + getUrl() + ", cause: " + e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public void doSubscribe(URL url, NotifyListener listener) {
+        try {
+            if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
+                String root = toRootPath();
+
+                /**
+                 *  if we interesting all interfaces,
+                 *  we find current or create container for url, put or get only once.
+                 */
+                ConcurrentMap<NotifyListener, ChildListener> listeners =
+                        Optional.ofNullable(etcdListeners.get(url))
+                                .orElseGet(() -> {
+                                    ConcurrentMap<NotifyListener, ChildListener> container, prev;
+                                    prev = etcdListeners.putIfAbsent(url, container = new ConcurrentHashMap<>());
+                                    return prev != null ? prev : container;
+                                });
+
+                /**
+                 *  if we have not interface watcher listener,
+                 *  we find current or create listener for current root, put or get only once.
+                 */
+                ChildListener interfaceListener =
+                        Optional.ofNullable(listeners.get(listener))
+                                .orElseGet(() -> {
+                                    ChildListener childListener, prev;
+                                    prev = listeners.putIfAbsent(listener, childListener = new ChildListener() {
+                                        public void childChanged(String parentPath, List<String> currentChildren) {
+                                            /**
+                                             *  because etcd3 not support direct children watch events,
+                                             *  we should filter not interface events. if we watch /dubbo
+                                             *  and /dubbo/interface, when we put key-value pair {/dubbo/interface/hello hello},
+                                             *  we will got events in watching path /dubbo.
+                                             */
+                                            for (String child : currentChildren) {
+                                                child = URL.decode(child);
+                                                if (!anyServices.contains(child)) {
+                                                    anyServices.add(child);
+                                                    /**
+                                                     *  if new interface event arrived, we watching direct children,
+                                                     *  eg: /dubbo/interface, /dubbo/interface and so on.
+                                                     */
+                                                    subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
+                                                            Constants.CHECK_KEY, String.valueOf(false)), listener);
+                                                }
+                                            }
+                                        }
+                                    });
+                                    return prev != null ? prev : childListener;
+                                });
+
+                etcdClient.create(root);
+                /**
+                 *  first time, we want pull already interface and then watching direct children,
+                 *  eg: /dubbo/interface, /dubbo/interface and so on.
+                 */
+                List<String> services = etcdClient.addChildListener(root, interfaceListener);
+                for (String service : services) {
+                    service = URL.decode(service);
+                    anyServices.add(service);
+                    subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,
+                            Constants.CHECK_KEY, String.valueOf(false)), listener);
+                }
+            } else {
+                List<URL> urls = new ArrayList<URL>();
+                for (String path : toCategoriesPath(url)) {
+
+                    /**
+                     *  if we interesting special categories (providers, consumers, routers and so on),
+                     *  we find current or create container for url, put or get only once.
+                     */
+                    ConcurrentMap<NotifyListener, ChildListener> listeners =
+                            Optional.ofNullable(etcdListeners.get(url))
+                                    .orElseGet(() -> {
+                                        ConcurrentMap<NotifyListener, ChildListener> container, prev;
+                                        prev = etcdListeners.putIfAbsent(url,
+                                                container = new ConcurrentHashMap<NotifyListener, ChildListener>());
+                                        return prev != null ? prev : container;
+                                    });
+
+                    /**
+                     *  if we have no category watcher listener,
+                     *  we find current or create listener for current category, put or get only once.
+                     */
+                    ChildListener childListener =
+                            Optional.ofNullable(listeners.get(listener))
+                                    .orElseGet(() -> {
+                                        ChildListener watchListener, prev;
+                                        prev = listeners.putIfAbsent(listener, watchListener = new ChildListener() {
+                                            public void childChanged(String parentPath, List<String> currentChildren) {
+                                                EtcdRegistry.this.notify(url, listener,
+                                                        toUrlsWithEmpty(url, parentPath, currentChildren));
+                                            }
+                                        });
+                                        return prev != null ? prev : watchListener;
+                                    });
+
+                    etcdClient.create(path);
+                    /**
+                     *  first time, we want pull already category and then watching direct children,
+                     *  eg: /dubbo/interface/providers, /dubbo/interface/consumers and so on.
+                     */
+                    List<String> children = etcdClient.addChildListener(path, childListener);
+                    if (children != null) {
+                        final String watchPath = path;
+                        urls.addAll(toUrlsWithEmpty(url, path, children));
+                    }
+                }
+                notify(url, listener, urls);
+            }
+        } catch (Throwable e) {
+            throw new RpcException("Failed to subscribe " + url + " to etcd " + getUrl()
+                    + ", cause: " + (OptionUtil.isProtocolError(e)
+                    ? "etcd3 registy maybe not supported yet or etcd3 registry not available."
+                    : e.getMessage()), e);
+        }
+    }
+
+    @Override
+    public void doUnsubscribe(URL url, NotifyListener listener) {
+        ConcurrentMap<NotifyListener, ChildListener> listeners = etcdListeners.get(url);
+        if (listeners != null) {
+            ChildListener etcdListener = listeners.get(listener);
+            if (etcdListener != null) {
+                // maybe url has many subscribe path
+                for (String path : toUnsubscribedPath(url)) {
+                    etcdClient.removeChildListener(path, etcdListener);
+                }
+            }
+        }
+    }
+
+    @Override
+    public boolean isAvailable() {
+        return etcdClient.isConnected();
+    }
+
+    @Override
+    public void destroy() {
+        super.destroy();
+        try {
+            etcdClient.close();
+        } catch (Exception e) {
+            logger.warn("Failed to close etcd client " + getUrl() + ", cause: " + e.getMessage(), e);
+        }
+    }
+
+    protected String toRootDir() {
+        if (root.startsWith(Constants.PATH_SEPARATOR)) {
+            return root;
+        }
+        return Constants.PATH_SEPARATOR + root;
+    }
+
+    protected String toRootPath() {
+        return root;
+    }
+
+    protected String toServicePath(URL url) {
+        String name = url.getServiceInterface();
+        if (Constants.ANY_VALUE.equals(name)) {
+            return toRootPath();
+        }
+        return toRootDir() + Constants.PATH_SEPARATOR + URL.encode(name);
+    }
+
+    protected String[] toCategoriesPath(URL url) {
+        String[] categroies;
+        if (Constants.ANY_VALUE.equals(url.getParameter(Constants.CATEGORY_KEY))) {
+            categroies = new String[]{Constants.PROVIDERS_CATEGORY, Constants.CONSUMERS_CATEGORY,
+                    Constants.ROUTERS_CATEGORY, Constants.CONFIGURATORS_CATEGORY};
+        } else {
+            categroies = url.getParameter(Constants.CATEGORY_KEY, new String[]{Constants.DEFAULT_CATEGORY});
+        }
+        String[] paths = new String[categroies.length];
+        for (int i = 0; i < categroies.length; i++) {
+            paths[i] = toServicePath(url) + Constants.PATH_SEPARATOR + categroies[i];
+        }
+        return paths;
+    }
+
+    protected String toCategoryPath(URL url) {
+        return toServicePath(url) + Constants.PATH_SEPARATOR + url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
+    }
+
+    protected String toUrlPath(URL url) {
+        return toCategoryPath(url) + Constants.PATH_SEPARATOR + URL.encode(url.toFullString());
+    }
+
+    protected List<String> toUnsubscribedPath(URL url) {
+        List<String> categories = new ArrayList<>();
+        if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
+            String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
+            if (!group.startsWith(Constants.PATH_SEPARATOR)) {
+                group = Constants.PATH_SEPARATOR + group;
+            }
+            categories.add(group);
+            return categories;
+        } else {
+            for (String path : toCategoriesPath(url)) {
+                categories.add(path);
+            }
+        }
+        return categories;
+    }
+
+    protected List<URL> toUrlsWithoutEmpty(URL consumer, List<String> providers) {
+        List<URL> urls = new ArrayList<URL>();
+        if (providers != null && providers.size() > 0) {
+            for (String provider : providers) {
+                provider = URL.decode(provider);
+                if (provider.contains(Constants.HTTP_SUBFIX_KEY)) {
+                    URL url = URL.valueOf(provider);
+                    if (UrlUtils.isMatch(consumer, url)) {
+                        urls.add(url);
+                    }
+                }
+            }
+        }
+        return urls;
+    }
+
+    protected List<URL> toUrlsWithEmpty(URL consumer, String path, List<String> providers) {
+        List<URL> urls = toUrlsWithoutEmpty(consumer, providers);
+        if (urls == null || urls.isEmpty()) {
+            int i = path.lastIndexOf('/');
+            String category = i < 0 ? path : path.substring(i + 1);
+            URL empty = consumer.setProtocol(Constants.EMPTY_PROTOCOL).addParameter(Constants.CATEGORY_KEY, category);
+            urls.add(empty);
+        }
+        return urls;
+    }
+}
diff --git a/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdRegistryFactory.java b/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdRegistryFactory.java
new file mode 100644
index 0000000..187da19
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdRegistryFactory.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.etcd;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.registry.Registry;
+import org.apache.dubbo.registry.support.AbstractRegistryFactory;
+import org.apache.dubbo.remoting.etcd.EtcdTransporter;
+
+public class EtcdRegistryFactory extends AbstractRegistryFactory {
+
+    private EtcdTransporter etcdTransporter;
+
+    @Override
+    protected Registry createRegistry(URL url) {
+        return new EtcdRegistry(url, etcdTransporter);
+    }
+
+    public void setEtcdTransporter(EtcdTransporter etcdTransporter) {
+        this.etcdTransporter = etcdTransporter;
+    }
+}
diff --git a/dubbo-registry/dubbo-registry-etcd3/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.RegistryFactory b/dubbo-registry/dubbo-registry-etcd3/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.RegistryFactory
new file mode 100644
index 0000000..4a6d09c
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-etcd3/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.RegistryFactory
@@ -0,0 +1 @@
+etcd3=org.apache.dubbo.registry.etcd.EtcdRegistryFactory
\ No newline at end of file
diff --git a/dubbo-registry/dubbo-registry-etcd3/src/test/java/org/apache/dubbo/registry/etcd/EtcdRegistryTest.java b/dubbo-registry/dubbo-registry-etcd3/src/test/java/org/apache/dubbo/registry/etcd/EtcdRegistryTest.java
new file mode 100644
index 0000000..c23c1d2
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-etcd3/src/test/java/org/apache/dubbo/registry/etcd/EtcdRegistryTest.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.etcd;
+
+import org.apache.dubbo.common.Constants;
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.extension.ExtensionLoader;
+import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.registry.NotifyListener;
+import org.apache.dubbo.registry.RegistryFactory;
+import org.apache.dubbo.registry.support.AbstractRegistryFactory;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+@Disabled
+public class EtcdRegistryTest {
+
+    String service = "org.apache.dubbo.internal.test.DemoServie";
+    String outerService = "org.apache.dubbo.outer.test.OuterDemoServie";
+    URL serviceUrl = URL.valueOf("dubbo://" + NetUtils.getLocalHost() + "/" + service + "?methods=test1,test2");
+    URL serviceUrl2 = URL.valueOf("dubbo://" + NetUtils.getLocalHost() + "/" + service + "?methods=test1,test2,test3");
+    URL serviceUrl3 = URL.valueOf("dubbo://" + NetUtils.getLocalHost() + "/" + outerService + "?methods=test1,test2");
+    URL registryUrl = URL.valueOf("etcd3://127.0.0.1:2379/org.apache.dubbo.registry.RegistryService");
+    URL consumerUrl = URL.valueOf("dubbo://" + NetUtils.getLocalHost() + ":2018" + "/" + service + "?methods=test1,test2");
+    RegistryFactory registryFactory = ExtensionLoader.getExtensionLoader(RegistryFactory.class).getAdaptiveExtension();
+    EtcdRegistry registry;
+    URL subscribe = new URL(
+            Constants.ADMIN_PROTOCOL, NetUtils.getLocalHost(), 0, "",
+            Constants.INTERFACE_KEY, Constants.ANY_VALUE,
+            Constants.GROUP_KEY, Constants.ANY_VALUE,
+            Constants.VERSION_KEY, Constants.ANY_VALUE,
+            Constants.CLASSIFIER_KEY, Constants.ANY_VALUE,
+            Constants.CATEGORY_KEY, Constants.PROVIDERS_CATEGORY + ","
+            + Constants.CONSUMERS_CATEGORY + ","
+            + Constants.ROUTERS_CATEGORY + ","
+            + Constants.CONFIGURATORS_CATEGORY,
+            Constants.ENABLED_KEY, Constants.ANY_VALUE,
+            Constants.CHECK_KEY, String.valueOf(false));
+
+    @Test
+    public void test_register() {
+
+        registry.register(serviceUrl);
+        Set<URL> registered = registry.getRegistered();
+        Assertions.assertEquals(1, registered.size());
+        Assertions.assertTrue(registered.contains(serviceUrl));
+
+        registry.unregister(serviceUrl);
+    }
+
+    @Test
+    public void test_unregister() {
+
+        registry.register(serviceUrl);
+        Set<URL> registered = registry.getRegistered();
+        Assertions.assertTrue(registered.size() == 1);
+        Assertions.assertTrue(registered.contains(serviceUrl));
+
+        registry.unregister(serviceUrl);
+
+        registered = registry.getRegistered();
+        Assertions.assertTrue(registered.size() == 0);
+    }
+
+    @Test
+    public void test_subscribe() {
+
+        registry.register(serviceUrl);
+
+        final AtomicReference<URL> notifiedUrl = new AtomicReference<URL>();
+        registry.subscribe(consumerUrl, new NotifyListener() {
+            public void notify(List<URL> urls) {
+                notifiedUrl.set(urls.get(0));
+            }
+        });
+        Assertions.assertEquals(serviceUrl.toFullString(), notifiedUrl.get().toFullString());
+        Map<URL, Set<NotifyListener>> arg = registry.getSubscribed();
+        Assertions.assertEquals(consumerUrl, arg.keySet().iterator().next());
+    }
+
+    @Test
+    public void test_subscribe_when_register() throws InterruptedException {
+
+        Assertions.assertTrue(registry.getRegistered().size() == 0);
+        Assertions.assertTrue(registry.getSubscribed().size() == 0);
+
+        CountDownLatch notNotified = new CountDownLatch(2);
+
+        final AtomicReference<URL> notifiedUrl = new AtomicReference<URL>();
+        registry.subscribe(consumerUrl, new NotifyListener() {
+            public void notify(List<URL> urls) {
+                notifiedUrl.set(urls.get(0));
+                notNotified.countDown();
+            }
+        });
+
+        registry.register(serviceUrl);
+
+        Assertions.assertTrue(notNotified.await(15, TimeUnit.SECONDS));
+
+        Assertions.assertEquals(serviceUrl.toFullString(), notifiedUrl.get().toFullString());
+        Map<URL, Set<NotifyListener>> subscribed = registry.getSubscribed();
+        Assertions.assertEquals(consumerUrl, subscribed.keySet().iterator().next());
+    }
+
+    @Test
+    public void test_subscribe_when_register0() throws InterruptedException {
+
+        Assertions.assertTrue(registry.getRegistered().size() == 0);
+        Assertions.assertTrue(registry.getSubscribed().size() == 0);
+
+        CountDownLatch notNotified = new CountDownLatch(3);
+        ConcurrentHashMap<URL, Boolean> notifiedUrls = new ConcurrentHashMap<>();
+        registry.subscribe(consumerUrl, new NotifyListener() {
+            public void notify(List<URL> urls) {
+                if (urls != null && urls.size() > 0) {
+                    if (!urls.get(0).getProtocol().equals("empty")) {
+                        for (Iterator<URL> iterator = urls.iterator(); iterator.hasNext(); ) {
+                            notifiedUrls.put(iterator.next(), true);
+                        }
+                    }
+                }
+
+                notNotified.countDown();
+            }
+        });
+
+        registry.register(serviceUrl);
+        registry.register(serviceUrl2);
+
+        Assertions.assertTrue(notNotified.await(15, TimeUnit.SECONDS));
+
+        Assertions.assertTrue(notifiedUrls.containsKey(serviceUrl));
+        Assertions.assertTrue(notifiedUrls.containsKey(serviceUrl2));
+        Map<URL, Set<NotifyListener>> subscribed = registry.getSubscribed();
+        Assertions.assertEquals(consumerUrl, subscribed.keySet().iterator().next());
+    }
+
+    @Test
+    public void test_subscribe_when_register1() throws InterruptedException {
+
+        Assertions.assertTrue(registry.getRegistered().size() == 0);
+        Assertions.assertTrue(registry.getSubscribed().size() == 0);
+
+        CountDownLatch notNotified = new CountDownLatch(2);
+
+        final AtomicReference<URL> notifiedUrls = new AtomicReference<URL>();
+        registry.subscribe(consumerUrl, new NotifyListener() {
+            public void notify(List<URL> urls) {
+                notifiedUrls.set(urls.get(0));
+                notNotified.countDown();
+            }
+        });
+
+        registry.register(serviceUrl);
+        // register service3 should not trigger notify
+        registry.register(serviceUrl3);
+
+        Assertions.assertTrue(notNotified.await(15, TimeUnit.SECONDS));
+
+        Assertions.assertEquals(serviceUrl, notifiedUrls.get());
+        Map<URL, Set<NotifyListener>> subscribed = registry.getSubscribed();
+        Assertions.assertEquals(consumerUrl, subscribed.keySet().iterator().next());
+    }
+
+    @Test
+    public void test_subscribe_when_register2() throws InterruptedException {
+
+        Assertions.assertTrue(registry.getRegistered().size() == 0);
+        Assertions.assertTrue(registry.getSubscribed().size() == 0);
+
+        CountDownLatch notNotified = new CountDownLatch(3);
+
+        ConcurrentHashMap<URL, Boolean> notifiedUrls = new ConcurrentHashMap<>();
+
+        registry.subscribe(subscribe, new NotifyListener() {
+            public void notify(List<URL> urls) {
+                if (urls != null && urls.size() > 0) {
+                    if (!urls.get(0).getProtocol().equals("empty")) {
+                        for (Iterator<URL> iterator = urls.iterator(); iterator.hasNext(); ) {
+                            notifiedUrls.put(iterator.next(), true);
+                        }
+                        notNotified.countDown();
+                    }
+                }
+            }
+        });
+
+        registry.register(serviceUrl);
+        registry.register(serviceUrl2);
+        // service3 interface is not equals server2
+        registry.register(serviceUrl3);
+
+        Assertions.assertTrue(notNotified.await(15, TimeUnit.SECONDS));
+        Assertions.assertTrue(notifiedUrls.size() == 3);
+        Assertions.assertTrue(notifiedUrls.containsKey(serviceUrl));
+        Assertions.assertTrue(notifiedUrls.containsKey(serviceUrl2));
+        Assertions.assertTrue(notifiedUrls.containsKey(serviceUrl3));
+    }
+
+    @Test
+    public void test_unsubscribe() throws InterruptedException {
+
+        Assertions.assertTrue(registry.getRegistered().size() == 0);
+        Assertions.assertTrue(registry.getSubscribed().size() == 0);
+
+        CountDownLatch notNotified = new CountDownLatch(2);
+
+        final AtomicReference<URL> notifiedUrl = new AtomicReference<URL>();
+
+        NotifyListener listener = new NotifyListener() {
+            public void notify(List<URL> urls) {
+                if (urls != null) {
+                    for (Iterator<URL> iterator = urls.iterator(); iterator.hasNext(); ) {
+                        URL url = iterator.next();
+                        if (!url.getProtocol().equals("empty")) {
+                            notifiedUrl.set(url);
+                            notNotified.countDown();
+                        }
+                    }
+                }
+            }
+        };
+        registry.subscribe(consumerUrl, listener);
+        registry.unsubscribe(consumerUrl, listener);
+
+        registry.register(serviceUrl);
+
+        Assertions.assertFalse(notNotified.await(2, TimeUnit.SECONDS));
+        // expect nothing happen
+        Assertions.assertTrue(notifiedUrl.get() == null);
+    }
+
+    @BeforeEach
+    public void setUp() {
+        registry = (EtcdRegistry) registryFactory.getRegistry(registryUrl);
+        Assertions.assertTrue(registry != null);
+        if (!registry.isAvailable()) {
+            AbstractRegistryFactory.destroyAll();
+            registry = (EtcdRegistry) registryFactory.getRegistry(registryUrl);
+        }
+    }
+
+    @AfterEach
+    public void tearDown() {
+
+        registry.unregister(serviceUrl);
+        registry.unregister(serviceUrl2);
+        registry.unregister(serviceUrl3);
+        registry.unregister(subscribe);
+
+        registry.destroy();
+    }
+
+
+}
diff --git a/dubbo-registry/pom.xml b/dubbo-registry/pom.xml
index f74cca0..33a329b 100644
--- a/dubbo-registry/pom.xml
+++ b/dubbo-registry/pom.xml
@@ -35,5 +35,6 @@
         <module>dubbo-registry-zookeeper</module>
         <module>dubbo-registry-redis</module>
         <module>dubbo-registry-consul</module>
+        <module>dubbo-registry-etcd3</module>
     </modules>
 </project>
diff --git a/dubbo-remoting/dubbo-remoting-etcd3/pom.xml b/dubbo-remoting/dubbo-remoting-etcd3/pom.xml
new file mode 100644
index 0000000..768052f
--- /dev/null
+++ b/dubbo-remoting/dubbo-remoting-etcd3/pom.xml
@@ -0,0 +1,52 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <artifactId>dubbo-remoting</artifactId>
+        <groupId>org.apache.dubbo</groupId>
+        <version>2.7.1-SNAPSHOT</version>
+    </parent>
+    <artifactId>dubbo-remoting-etcd3</artifactId>
+    <packaging>jar</packaging>
+    <name>${project.artifactId}</name>
+    <description>The etcd3 remoting module of Dubbo project</description>
+    <properties>
+        <skip_maven_deploy>false</skip_maven_deploy>
+    </properties>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.dubbo</groupId>
+            <artifactId>dubbo-common</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>io.etcd</groupId>
+            <artifactId>jetcd-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-all</artifactId>
+        </dependency>
+    </dependencies>
+
+
+</project>
\ No newline at end of file
diff --git a/dubbo-common/src/test/java/org/apache/dubbo/common/config/ConfigurationUtilsTest.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/AbstractRetryPolicy.java
similarity index 52%
copy from dubbo-common/src/test/java/org/apache/dubbo/common/config/ConfigurationUtilsTest.java
copy to dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/AbstractRetryPolicy.java
index 89da34c..f626202 100644
--- a/dubbo-common/src/test/java/org/apache/dubbo/common/config/ConfigurationUtilsTest.java
+++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/AbstractRetryPolicy.java
@@ -14,29 +14,31 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.dubbo.common.config;
+package org.apache.dubbo.remoting.etcd;
 
-import org.apache.dubbo.common.Constants;
+public abstract class AbstractRetryPolicy implements RetryPolicy {
 
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
+    private final int maxRetried;
 
-/**
- *
- */
-public class ConfigurationUtilsTest {
-
-    @Test
-    public void testGetServerShutdownTimeout () {
-        System.setProperty(Constants.SHUTDOWN_WAIT_KEY, " 10000");
-        Assertions.assertEquals(10000, ConfigurationUtils.getServerShutdownTimeout());
-        System.clearProperty(Constants.SHUTDOWN_WAIT_KEY);
+    protected AbstractRetryPolicy(int maxRetried) {
+        this.maxRetried = maxRetried;
     }
 
-    @Test
-    public void testGetProperty () {
-        System.setProperty(Constants.SHUTDOWN_WAIT_KEY, " 10000");
-        Assertions.assertEquals("10000", ConfigurationUtils.getProperty(Constants.SHUTDOWN_WAIT_KEY));
-        System.clearProperty(Constants.SHUTDOWN_WAIT_KEY);
+    public boolean shouldRetry(int retried, long elapsed, boolean sleep) {
+        if (retried < maxRetried) {
+            try {
+                if (sleep) {
+                    Thread.sleep(getSleepTime(retried, elapsed));
+                }
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                return false;
+            }
+            return true;
+        }
+        return false;
     }
+
+    protected abstract long getSleepTime(int retried, long elapsed);
+
 }
diff --git a/dubbo-common/src/test/java/org/apache/dubbo/common/config/ConfigurationUtilsTest.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/ChildListener.java
similarity index 51%
copy from dubbo-common/src/test/java/org/apache/dubbo/common/config/ConfigurationUtilsTest.java
copy to dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/ChildListener.java
index 89da34c..46a0af8 100644
--- a/dubbo-common/src/test/java/org/apache/dubbo/common/config/ConfigurationUtilsTest.java
+++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/ChildListener.java
@@ -14,29 +14,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.dubbo.common.config;
+package org.apache.dubbo.remoting.etcd;
 
-import org.apache.dubbo.common.Constants;
+import java.util.List;
 
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
+public interface ChildListener {
 
-/**
- *
- */
-public class ConfigurationUtilsTest {
-
-    @Test
-    public void testGetServerShutdownTimeout () {
-        System.setProperty(Constants.SHUTDOWN_WAIT_KEY, " 10000");
-        Assertions.assertEquals(10000, ConfigurationUtils.getServerShutdownTimeout());
-        System.clearProperty(Constants.SHUTDOWN_WAIT_KEY);
-    }
+    void childChanged(String path, List<String> children);
 
-    @Test
-    public void testGetProperty () {
-        System.setProperty(Constants.SHUTDOWN_WAIT_KEY, " 10000");
-        Assertions.assertEquals("10000", ConfigurationUtils.getProperty(Constants.SHUTDOWN_WAIT_KEY));
-        System.clearProperty(Constants.SHUTDOWN_WAIT_KEY);
-    }
 }
diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdClient.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdClient.java
new file mode 100644
index 0000000..b1e765d
--- /dev/null
+++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdClient.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.etcd;
+
+import org.apache.dubbo.common.URL;
+
+import java.util.List;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public interface EtcdClient {
+
+    /**
+     * save the specified path to the etcd registry.
+     *
+     * @param path the path to be saved
+     */
+    void create(String path);
+
+    /**
+     * save the specified path to the etcd registry.
+     * if node disconnect from etcd, it will be deleted
+     * automatically by etcd when sessian timeout.
+     *
+     * @param path the path to be saved
+     * @return the lease of current path.
+     */
+    long createEphemeral(String path);
+
+    /**
+     * remove the specified  from etcd registry.
+     *
+     * @param path the path to be removed
+     */
+    void delete(String path);
+
+    /**
+     * find direct children directory, excluding path self,
+     * Never return null.
+     *
+     * @param path the path to be found direct children.
+     * @return direct children directory, contains zero element
+     * list if children directory not exists.
+     */
+    List<String> getChildren(String path);
+
+    /**
+     * register children listener for specified path.
+     *
+     * @param path     the path to be watched when children is added, delete or update.
+     * @param listener when children is changed , listener will be trigged.
+     * @return direct children directory, contains zero element
+     * list if children directory not exists.
+     */
+    List<String> addChildListener(String path, ChildListener listener);
+
+    /**
+     * find watcher of the children listener for specified path.
+     *
+     * @param path     the path to be watched when children is added, delete or update.
+     * @param listener when children is changed , listener will be trigged.
+     * @return watcher if find else null
+     */
+    <T> T getChildListener(String path, ChildListener listener);
+
+    /**
+     * unregister children lister for specified path.
+     *
+     * @param path     the path to be unwatched .
+     * @param listener when children is changed , lister will be trigged.
+     */
+    void removeChildListener(String path, ChildListener listener);
+
+    /**
+     * support connection notify if connection state was changed.
+     *
+     * @param listener if state changed, listener will be triggered.
+     */
+    void addStateListener(StateListener listener);
+
+    /**
+     * remove connection notify if connection state was changed.
+     *
+     * @param listener remove already registered listener, if listener
+     *                 not exists nothing happened.
+     */
+    void removeStateListener(StateListener listener);
+
+    /**
+     * test if current client is active.
+     *
+     * @return true if connection is active else false.
+     */
+    boolean isConnected();
+
+    /**
+     * close current client and release all resourses.
+     */
+    void close();
+
+    URL getUrl();
+
+    /***
+     * create new lease from specified second ,it should be waiting if failed.<p>
+     *
+     * @param second lease time (support second only).
+     * @return lease id from etcd
+     */
+    long createLease(long second);
+
+    /***
+     * create new lease from specified ttl second before waiting specified timeout.<p>
+     *
+     * @param ttl lease time (support second only).
+     * @param timeout the maximum time to wait
+     * @param unit the time unit of the timeout argument
+     * @throws CancellationException if this future was cancelled
+     * @throws ExecutionException if this future completed exceptionally
+     * @throws InterruptedException if the current thread was interrupted
+     * while waiting
+     * @throws TimeoutException if the wait timed out
+     * @return lease id from etcd
+     */
+    public long createLease(long ttl, long timeout, TimeUnit unit)
+            throws InterruptedException, ExecutionException, TimeoutException;
+
+    /**
+     * revoke specified lease, any associated path will removed automatically.
+     *
+     * @param lease to be removed lease
+     */
+    void revokeLease(long lease);
+
+}
diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdTransporter.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdTransporter.java
new file mode 100644
index 0000000..2c0befb
--- /dev/null
+++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdTransporter.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.etcd;
+
+import org.apache.dubbo.common.Constants;
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.extension.Adaptive;
+import org.apache.dubbo.common.extension.SPI;
+
+@SPI("jetcd")
+public interface EtcdTransporter {
+
+    @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
+    EtcdClient connect(URL url);
+
+}
diff --git a/dubbo-common/src/test/java/org/apache/dubbo/common/config/ConfigurationUtilsTest.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/RetryPolicy.java
similarity index 51%
copy from dubbo-common/src/test/java/org/apache/dubbo/common/config/ConfigurationUtilsTest.java
copy to dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/RetryPolicy.java
index 89da34c..b1fc525 100644
--- a/dubbo-common/src/test/java/org/apache/dubbo/common/config/ConfigurationUtilsTest.java
+++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/RetryPolicy.java
@@ -14,29 +14,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.dubbo.common.config;
+package org.apache.dubbo.remoting.etcd;
 
-import org.apache.dubbo.common.Constants;
+public interface RetryPolicy {
 
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
+    /**
+     * Whether retry is supported when operation fails.
+     *
+     * @param retried the number of times retried so far
+     * @param elapsed the elapsed time in millisecond since the operation was attempted
+     * @param sleep   should be sleep
+     * @return true should be retry
+     */
+    public boolean shouldRetry(int retried, long elapsed, boolean sleep);
 
-/**
- *
- */
-public class ConfigurationUtilsTest {
-
-    @Test
-    public void testGetServerShutdownTimeout () {
-        System.setProperty(Constants.SHUTDOWN_WAIT_KEY, " 10000");
-        Assertions.assertEquals(10000, ConfigurationUtils.getServerShutdownTimeout());
-        System.clearProperty(Constants.SHUTDOWN_WAIT_KEY);
-    }
-
-    @Test
-    public void testGetProperty () {
-        System.setProperty(Constants.SHUTDOWN_WAIT_KEY, " 10000");
-        Assertions.assertEquals("10000", ConfigurationUtils.getProperty(Constants.SHUTDOWN_WAIT_KEY));
-        System.clearProperty(Constants.SHUTDOWN_WAIT_KEY);
-    }
 }
diff --git a/dubbo-common/src/test/java/org/apache/dubbo/common/config/ConfigurationUtilsTest.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/StateListener.java
similarity index 51%
copy from dubbo-common/src/test/java/org/apache/dubbo/common/config/ConfigurationUtilsTest.java
copy to dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/StateListener.java
index 89da34c..4358083 100644
--- a/dubbo-common/src/test/java/org/apache/dubbo/common/config/ConfigurationUtilsTest.java
+++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/StateListener.java
@@ -14,29 +14,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.dubbo.common.config;
+package org.apache.dubbo.remoting.etcd;
 
-import org.apache.dubbo.common.Constants;
+public interface StateListener {
 
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
+    int DISCONNECTED = 0;
 
-/**
- *
- */
-public class ConfigurationUtilsTest {
+    int CONNECTED = 1;
 
-    @Test
-    public void testGetServerShutdownTimeout () {
-        System.setProperty(Constants.SHUTDOWN_WAIT_KEY, " 10000");
-        Assertions.assertEquals(10000, ConfigurationUtils.getServerShutdownTimeout());
-        System.clearProperty(Constants.SHUTDOWN_WAIT_KEY);
-    }
+    void stateChanged(int connected);
 
-    @Test
-    public void testGetProperty () {
-        System.setProperty(Constants.SHUTDOWN_WAIT_KEY, " 10000");
-        Assertions.assertEquals("10000", ConfigurationUtils.getProperty(Constants.SHUTDOWN_WAIT_KEY));
-        System.clearProperty(Constants.SHUTDOWN_WAIT_KEY);
-    }
 }
diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java
new file mode 100644
index 0000000..979caee
--- /dev/null
+++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.etcd.jetcd;
+
+import com.google.protobuf.ByteString;
+import io.etcd.jetcd.ByteSequence;
+import io.etcd.jetcd.api.Event;
+import io.etcd.jetcd.api.KeyValue;
+import io.etcd.jetcd.api.WatchCancelRequest;
+import io.etcd.jetcd.api.WatchCreateRequest;
+import io.etcd.jetcd.api.WatchGrpc;
+import io.etcd.jetcd.api.WatchRequest;
+import io.etcd.jetcd.api.WatchResponse;
+import io.etcd.jetcd.common.exception.ClosedClientException;
+import io.grpc.Status;
+import io.grpc.stub.StreamObserver;
+import io.netty.util.internal.ConcurrentSet;
+import org.apache.dubbo.common.Constants;
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.NamedThreadFactory;
+import org.apache.dubbo.remoting.etcd.ChildListener;
+import org.apache.dubbo.remoting.etcd.StateListener;
+import org.apache.dubbo.remoting.etcd.option.OptionUtil;
+import org.apache.dubbo.remoting.etcd.support.AbstractEtcdClient;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.dubbo.remoting.etcd.jetcd.JEtcdClientWrapper.UTF_8;
+
+/**
+ * etct3 client.
+ */
+public class JEtcdClient extends AbstractEtcdClient<JEtcdClient.EtcdWatcher> {
+
+    private JEtcdClientWrapper clientWrapper;
+    private ScheduledExecutorService reconnectSchedule;
+
+    private int delayPeriod;
+    private Logger logger = LoggerFactory.getLogger(JEtcdClient.class);
+
+    public JEtcdClient(URL url) {
+        super(url);
+        try {
+            clientWrapper = new JEtcdClientWrapper(url);
+            clientWrapper.setConnectionStateListener((client, state) -> {
+                if (state == StateListener.CONNECTED) {
+                    JEtcdClient.this.stateChanged(StateListener.CONNECTED);
+                } else if (state == StateListener.DISCONNECTED) {
+                    JEtcdClient.this.stateChanged(StateListener.DISCONNECTED);
+                }
+            });
+            delayPeriod = getUrl().getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
+            reconnectSchedule = Executors.newScheduledThreadPool(1,
+                    new NamedThreadFactory("auto-reconnect"));
+            clientWrapper.start();
+        } catch (Exception e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public void doCreatePersistent(String path) {
+        clientWrapper.createPersistent(path);
+    }
+
+    @Override
+    public long doCreateEphemeral(String path) {
+        return clientWrapper.createEphemeral(path);
+    }
+
+    @Override
+    public boolean checkExists(String path) {
+        return clientWrapper.checkExists(path);
+    }
+
+    @Override
+    public EtcdWatcher createChildWatcherListener(String path, ChildListener listener) {
+        return new EtcdWatcher(listener);
+    }
+
+    @Override
+    public List<String> addChildWatcherListener(String path, EtcdWatcher etcdWatcher) {
+        return etcdWatcher.forPath(path);
+    }
+
+    @Override
+    public void removeChildWatcherListener(String path, EtcdWatcher etcdWatcher) {
+        etcdWatcher.unwatch();
+    }
+
+    @Override
+    public List<String> getChildren(String path) {
+        return clientWrapper.getChildren(path);
+    }
+
+    @Override
+    public boolean isConnected() {
+        return clientWrapper.isConnected();
+    }
+
+    @Override
+    public long createLease(long second) {
+        return clientWrapper.createLease(second);
+    }
+
+    @Override
+    public long createLease(long ttl, long timeout, TimeUnit unit)
+            throws InterruptedException, ExecutionException, TimeoutException {
+        return clientWrapper.createLease(ttl, timeout, unit);
+    }
+
+    @Override
+    public void delete(String path) {
+        clientWrapper.delete(path);
+    }
+
+    @Override
+    public void revokeLease(long lease) {
+        clientWrapper.revokeLease(lease);
+    }
+
+    @Override
+    public void doClose() {
+        try {
+            reconnectSchedule.shutdownNow();
+        } catch (Exception e) {
+
+        } finally {
+            clientWrapper.doClose();
+        }
+    }
+
+    public class EtcdWatcher implements StreamObserver<WatchResponse> {
+
+        protected WatchGrpc.WatchStub watchStub;
+        protected StreamObserver<WatchRequest> watchRequest;
+        protected long watchId;
+        protected String path;
+        protected Throwable throwable;
+        protected Set<String> urls = new ConcurrentSet<>();
+        private ChildListener listener;
+
+        public EtcdWatcher(ChildListener listener) {
+            this.listener = listener;
+        }
+
+        @Override
+        public void onNext(WatchResponse response) {
+
+            // prevents grpc on sending watchResponse to a closed watch client.
+            if (!isConnected()) {
+                return;
+            }
+
+            watchId = response.getWatchId();
+
+            if (listener != null) {
+                int modified = 0;
+                String service = null;
+                Iterator<Event> iterator = response.getEventsList().iterator();
+                while (iterator.hasNext()) {
+                    Event event = iterator.next();
+                    switch (event.getType()) {
+                        case PUT: {
+                            if (((service = find(event)) != null)
+                                    && safeUpdate(service, true)) modified++;
+                            break;
+                        }
+                        case DELETE: {
+                            if (((service = find(event)) != null)
+                                    && safeUpdate(service, false)) modified++;
+                            break;
+                        }
+                        default:
+                            break;
+                    }
+                }
+                if (modified > 0) {
+                    listener.childChanged(path, new ArrayList<>(urls));
+                }
+
+            }
+        }
+
+        @Override
+        public void onError(Throwable e) {
+            tryReconnect(e);
+        }
+
+        public void unwatch() {
+
+            // prevents grpc on sending watchResponse to a closed watch client.
+            if (!isConnected()) {
+                return;
+            }
+
+            try {
+                this.listener = null;
+                if (watchRequest != null) {
+                    WatchCancelRequest watchCancelRequest =
+                            WatchCancelRequest.newBuilder().setWatchId(watchId).build();
+                    WatchRequest cancelRequest = WatchRequest.newBuilder()
+                            .setCancelRequest(watchCancelRequest).build();
+                    this.watchRequest.onNext(cancelRequest);
+                }
+            } catch (Exception ignored) {
+                logger.warn("Failed to cancel watch for path '" + path + "'", ignored);
+            }
+        }
+
+        public List<String> forPath(String path) {
+
+            if (!isConnected()) {
+                throw new ClosedClientException("watch client has been closed, path '" + path + "'");
+            }
+
+            if (this.path != null) {
+                if (this.path.equals(path)) {
+                    return clientWrapper.getChildren(path);
+                }
+                unwatch();
+            }
+
+            this.watchStub = WatchGrpc.newStub(clientWrapper.getChannel());
+            this.watchRequest = watchStub.watch(this);
+            this.path = path;
+            this.watchRequest.onNext(nextRequest());
+
+            List<String> children = clientWrapper.getChildren(path);
+
+            /**
+             * caching the current service
+             */
+            if (!children.isEmpty()) {
+                this.urls.addAll(filterChildren(children));
+            }
+
+            return new ArrayList<>(urls);
+        }
+
+        private boolean safeUpdate(String service, boolean add) {
+            synchronized (this) {
+                /**
+                 * If the collection already contains the specified service, do nothing
+                 */
+                return add ? this.urls.add(service) : this.urls.remove(service);
+            }
+        }
+
+        private String find(Event event) {
+            KeyValue keyValue = event.getKv();
+            String key = keyValue.getKey().toStringUtf8();
+
+            int len = path.length(), index = len, count = 0;
+            if (key.length() >= index) {
+                for (; (index = key.indexOf(Constants.PATH_SEPARATOR, index)) != -1; ++index) {
+                    if (count++ > 1) break;
+                }
+            }
+
+            /**
+             * if children changed , we should refresh invokers
+             */
+            if (count == 1) {
+                /**
+                 * remove prefix
+                 */
+                return key.substring(len + 1);
+            }
+
+            return null;
+        }
+
+        private List<String> filterChildren(List<String> children) {
+            if (children == null) return Collections.emptyList();
+            if (children.size() <= 0) return children;
+            final int len = path.length();
+            return children.stream().parallel()
+                    .filter(child -> {
+                        int index = len, count = 0;
+                        if (child.length() > len) {
+                            for (; (index = child.indexOf(Constants.PATH_SEPARATOR, index)) != -1; ++index) {
+                                if (count++ > 1) break;
+                            }
+                        }
+                        return count == 1;
+                    })
+                    .map(child -> child.substring(len + 1))
+                    .collect(toList());
+        }
+
+        /**
+         * create new watching request for current path.
+         */
+        protected WatchRequest nextRequest() {
+
+            WatchCreateRequest.Builder builder = WatchCreateRequest.newBuilder()
+                    .setKey(ByteString.copyFromUtf8(path))
+                    .setRangeEnd(ByteString.copyFrom(
+                            OptionUtil.prefixEndOf(ByteSequence.from(path, UTF_8)).getBytes()))
+                    .setProgressNotify(true);
+
+            return WatchRequest.newBuilder().setCreateRequest(builder).build();
+        }
+
+        public void tryReconnect(Throwable e) {
+
+            this.throwable = e;
+
+            logger.error("watcher client has error occurred, current path '" + path + "'", e);
+
+            // prevents grpc on sending error to a closed watch client.
+            if (!isConnected()) {
+                return;
+            }
+
+
+            Status status = Status.fromThrowable(e);
+            // system may be recover later, current connect won't be lost
+            if (OptionUtil.isHaltError(status) || OptionUtil.isNoLeaderError(status)) {
+                reconnectSchedule.schedule(this::reconnect, new Random().nextInt(delayPeriod), TimeUnit.MILLISECONDS);
+                return;
+            }
+            // reconnect with a delay; avoiding immediate retry on a long connection downtime.
+            reconnectSchedule.schedule(this::reconnect, new Random().nextInt(delayPeriod), TimeUnit.MILLISECONDS);
+        }
+
+        protected synchronized void reconnect() {
+            this.closeWatchRequest();
+            this.recreateWatchRequest();
+        }
+
+        protected void recreateWatchRequest() {
+            if (watchRequest == null) {
+                this.watchStub = WatchGrpc.newStub(clientWrapper.getChannel());
+                this.watchRequest = watchStub.watch(this);
+            }
+            this.watchRequest.onNext(nextRequest());
+            this.throwable = null;
+            logger.warn("watch client retried connect for path '" + path + "', connection status : " + isConnected());
+        }
+
+        protected void closeWatchRequest() {
+            if (this.watchRequest == null) {
+                return;
+            }
+            this.watchRequest.onCompleted();
+            this.watchRequest = null;
+        }
+
+        @Override
+        public void onCompleted() {
+            // do not touch this method, if you want terminate this stream.
+        }
+    }
+}
diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java
new file mode 100644
index 0000000..e563cc2
--- /dev/null
+++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java
@@ -0,0 +1,706 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.etcd.jetcd;
+
+import io.etcd.jetcd.ByteSequence;
+import io.etcd.jetcd.Client;
+import io.etcd.jetcd.ClientBuilder;
+import io.etcd.jetcd.CloseableClient;
+import io.etcd.jetcd.Observers;
+import io.etcd.jetcd.common.exception.ErrorCode;
+import io.etcd.jetcd.common.exception.EtcdException;
+import io.etcd.jetcd.lease.LeaseKeepAliveResponse;
+import io.etcd.jetcd.options.GetOption;
+import io.etcd.jetcd.options.PutOption;
+import io.grpc.ConnectivityState;
+import io.grpc.ManagedChannel;
+import io.grpc.stub.StreamObserver;
+import io.grpc.util.RoundRobinLoadBalancerFactory;
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.ConcurrentHashSet;
+import org.apache.dubbo.common.utils.NamedThreadFactory;
+import org.apache.dubbo.common.utils.StringUtils;
+import org.apache.dubbo.remoting.etcd.RetryPolicy;
+import org.apache.dubbo.remoting.etcd.StateListener;
+import org.apache.dubbo.remoting.etcd.option.Constants;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+
+import static java.util.stream.Collectors.toList;
+
+public class JEtcdClientWrapper {
+
+    private Logger logger = LoggerFactory.getLogger(JEtcdClientWrapper.class);
+
+    private final URL url;
+    private volatile Client client;
+    private volatile boolean started = false;
+    private volatile boolean connectState = false;
+    private ScheduledFuture future;
+    private ScheduledExecutorService reconnectNotify;
+    private AtomicReference<ManagedChannel> channel;
+
+    private ConnectionStateListener connectionStateListener;
+
+    private long expirePeriod;
+
+    private CompletableFuture<Client> completableFuture;
+
+    private RetryPolicy retryPolicy;
+
+    private RuntimeException failed;
+
+    private final ScheduledFuture<?> retryFuture;
+    private final ScheduledExecutorService retryExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("Etcd3RegistryKeepAliveFailedRetryTimer", true));
+
+    private final Set<String> failedRegistered = new ConcurrentHashSet<String>();
+
+    private final Set<String> registeredPaths = new ConcurrentHashSet<>();
+    private volatile CloseableClient keepAlive = null;
+
+    /**
+     * Support temporary nodes to reuse the same lease
+     */
+    private volatile long globalLeaseId;
+
+    private volatile boolean cancelKeepAlive = false;
+
+    public static final Charset UTF_8 = Charset.forName("UTF-8");
+
+    public JEtcdClientWrapper(URL url) {
+        this.url = url;
+        this.expirePeriod = url.getParameter(Constants.SESSION_TIMEOUT_KEY, Constants.DEFAULT_KEEPALIVE_TIMEOUT) / 1000;
+        if (expirePeriod <= 0) {
+            this.expirePeriod = Constants.DEFAULT_KEEPALIVE_TIMEOUT / 1000;
+        }
+        this.channel = new AtomicReference<>();
+        this.completableFuture = CompletableFuture.supplyAsync(() -> prepareClient(url));
+        this.reconnectNotify = Executors.newScheduledThreadPool(1,
+                new NamedThreadFactory("reconnectNotify", true));
+        this.retryPolicy = new RetryNTimes(1, 1000, TimeUnit.MILLISECONDS);
+
+        this.failed = new IllegalStateException("Etcd3 registry is not connected yet, url:" + url);
+        int retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
+        this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
+            public void run() {
+                try {
+                    retry();
+                } catch (Throwable t) {
+                    logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
+                }
+            }
+        }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
+    }
+
+    private Client prepareClient(URL url) {
+
+        int maxInboudSize = DEFAULT_INBOUT_SIZE;
+        if (StringUtils.isNotEmpty(System.getProperty(GRPC_MAX_INBOUD_SIZE_KEY))) {
+            maxInboudSize = Integer.valueOf(System.getProperty(GRPC_MAX_INBOUD_SIZE_KEY));
+        }
+
+        ClientBuilder clientBuilder = Client.builder()
+                .loadBalancerFactory(RoundRobinLoadBalancerFactory.getInstance())
+                .endpoints(endPoints(url.getBackupAddress()))
+                .maxInboundMessageSize(maxInboudSize);
+
+        return clientBuilder.build();
+    }
+
+    public Client getClient() {
+        return client;
+    }
+
+    /**
+     * try to get current connected channel.
+     *
+     * @return connected channel.
+     */
+    public ManagedChannel getChannel() {
+        if (channel.get() == null || (channel.get().isShutdown() || channel.get().isTerminated())) {
+            channel.set(newChannel(client));
+        }
+        return channel.get();
+    }
+
+    /**
+     * find direct children directory, excluding path self,
+     * Never return null.
+     *
+     * @param path the path to be found direct children.
+     * @return direct children directory, contains zero element
+     * list if children directory not exists.
+     */
+    public List<String> getChildren(String path) {
+        try {
+            return RetryLoops.invokeWithRetry(
+                    new Callable<List<String>>() {
+                        @Override
+                        public List<String> call() throws Exception {
+                            requiredNotNull(client, failed);
+                            int len = path.length();
+                            return client.getKVClient()
+                                    .get(ByteSequence.from(path, UTF_8),
+                                            GetOption.newBuilder().withPrefix(ByteSequence.from(path, UTF_8)).build())
+                                    .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS)
+                                    .getKvs().stream().parallel()
+                                    .filter(pair -> {
+                                        String key = pair.getKey().toString(UTF_8);
+                                        int index = len, count = 0;
+                                        if (key.length() > len) {
+                                            for (; (index = key.indexOf(Constants.PATH_SEPARATOR, index)) != -1; ++index) {
+                                                if (count++ > 1) break;
+                                            }
+                                        }
+                                        return count == 1;
+                                    })
+                                    .map(pair -> pair.getKey().toString(UTF_8))
+                                    .collect(toList());
+                        }
+                    }, retryPolicy);
+        } catch (Exception e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+
+    public boolean isConnected() {
+        return ConnectivityState.READY == (getChannel().getState(false))
+                || ConnectivityState.IDLE == (getChannel().getState(false));
+    }
+
+    public long createLease(long second) {
+        try {
+            return RetryLoops.invokeWithRetry(
+                    new Callable<Long>() {
+                        @Override
+                        public Long call() throws Exception {
+                            requiredNotNull(client, failed);
+                            return client.getLeaseClient()
+                                    .grant(second)
+                                    .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS)
+                                    .getID();
+                        }
+                    }, retryPolicy);
+        } catch (Exception e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+
+    public void revokeLease(long lease) {
+        try {
+            RetryLoops.invokeWithRetry(
+                    new Callable<Void>() {
+                        @Override
+                        public Void call() throws Exception {
+                            requiredNotNull(client, failed);
+                            client.getLeaseClient()
+                                    .revoke(lease)
+                                    .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS);
+                            return null;
+                        }
+                    }, retryPolicy);
+        } catch (Exception e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+
+    public long createLease(long ttl, long timeout, TimeUnit unit)
+            throws InterruptedException, ExecutionException, TimeoutException {
+
+        if (timeout <= 0) {
+            return createLease(ttl);
+        }
+
+        requiredNotNull(client, failed);
+        return client.getLeaseClient()
+                .grant(ttl)
+                .get(timeout, unit).getID();
+    }
+
+
+    /**
+     * try to check if path exists.
+     */
+    public boolean checkExists(String path) {
+        try {
+            return RetryLoops.invokeWithRetry(
+                    new Callable<Boolean>() {
+                        @Override
+                        public Boolean call() throws Exception {
+                            requiredNotNull(client, failed);
+                            return client.getKVClient()
+                                    .get(ByteSequence.from(path, UTF_8), GetOption.newBuilder().withCountOnly(true).build())
+                                    .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS)
+                                    .getCount() > 0;
+                        }
+                    }, retryPolicy);
+        } catch (Exception e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * only internal use only, maybe change in the future
+     */
+    protected Long find(String path) {
+        try {
+            return RetryLoops.invokeWithRetry(
+                    new Callable<Long>() {
+                        @Override
+                        public Long call() throws Exception {
+                            requiredNotNull(client, failed);
+                            return client.getKVClient()
+                                    .get(ByteSequence.from(path, UTF_8))
+                                    .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS)
+                                    .getKvs().stream()
+                                    .mapToLong(keyValue -> Long.valueOf(keyValue.getValue().toString(UTF_8)))
+                                    .findFirst().getAsLong();
+                        }
+                    }, retryPolicy);
+        } catch (Exception e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+
+    public void createPersistent(String path) {
+        try {
+            RetryLoops.invokeWithRetry(
+                    new Callable<Void>() {
+                        @Override
+                        public Void call() throws Exception {
+                            requiredNotNull(client, failed);
+                            client.getKVClient()
+                                    .put(ByteSequence.from(path, UTF_8),
+                                            ByteSequence.from(String.valueOf(path.hashCode()), UTF_8))
+                                    .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS);
+                            return null;
+                        }
+                    }, retryPolicy);
+        } catch (Exception e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * create new ephemeral path save to etcd .
+     * if node disconnect from etcd, it will be deleted
+     * automatically by etcd when sessian timeout.
+     *
+     * @param path the path to be saved
+     * @return the lease of current path.
+     */
+    public long createEphemeral(String path) {
+        try {
+            return RetryLoops.invokeWithRetry(
+                    new Callable<Long>() {
+                        @Override
+                        public Long call() throws Exception {
+                            requiredNotNull(client, failed);
+
+                            keepAlive();
+                            registeredPaths.add(path);
+                            client.getKVClient()
+                                    .put(ByteSequence.from(path, UTF_8)
+                                            , ByteSequence.from(String.valueOf(globalLeaseId), UTF_8)
+                                            , PutOption.newBuilder().withLeaseId(globalLeaseId).build())
+                                    .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS);
+                            return globalLeaseId;
+                        }
+                    }, retryPolicy);
+        } catch (Exception e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+
+    // easy for mock
+    public void keepAlive(long lease) {
+        this.keepAlive(lease, null);
+    }
+
+    private <T> void keepAlive(long lease, Consumer<T> onFailed) {
+        final StreamObserver<LeaseKeepAliveResponse> observer = new Observers.Builder()
+                .onError((e) -> {
+                    if (e instanceof EtcdException) {
+                        EtcdException error = (EtcdException) e;
+                        /**
+                         * ttl has expired
+                         */
+                        if (error.getErrorCode() == ErrorCode.NOT_FOUND) {
+                            keepAlive0(onFailed);
+                        }
+                    }
+                }).onCompleted(() -> {
+                    /**
+                     * deadline reached.
+                     */
+                    keepAlive0(onFailed);
+                }).build();
+
+        /**
+         * If there is already a keepalive, cancel first
+         */
+        cancelKeepAlive();
+
+        /**
+         * create and set new keepAlive to globalKeepAliveRef
+         */
+        this.keepAlive = client.getLeaseClient().keepAlive(lease, observer);
+    }
+
+    private void keepAlive() throws Exception {
+        if (keepAlive == null) {
+            synchronized (this) {
+                if (keepAlive == null) {
+                    this.globalLeaseId = client.getLeaseClient()
+                            .grant(expirePeriod)
+                            .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS)
+                            .getID();
+                    /**
+                     * If the keepAlive expires, the registration will be re-attempted
+                     */
+                    keepAlive(globalLeaseId, (NULL) -> recovery());
+                }
+            }
+        }
+    }
+
+    private <T> void keepAlive0(Consumer<T> onFailed) {
+        if (onFailed != null) {
+
+            /**
+             * The following two scenarios will cause the keep-alive failureļ¼š
+             *
+             * 1. Service is offline
+             * 2. Local deadline check expired
+             *
+             * The multiplex lease cannot update the local deadline,
+             * causing the extreme scene service to be dropped.
+             *
+             */
+            try {
+                if (logger.isWarnEnabled()) {
+                    logger.warn("Failed to keep alive for global lease, waiting for retry again.");
+                }
+                onFailed.accept(null);
+            } catch (Exception ignored) {
+                logger.warn("Failed to recover from global lease expired or lease deadline exceeded.", ignored);
+            }
+        }
+    }
+
+    private void recovery() {
+
+        /**
+         * The client is processing reconnection
+         */
+        if (cancelKeepAlive) return;
+
+        cancelKeepAlive();
+
+        try {
+            Set<String> ephemeralPaths = new HashSet<String>(registeredPaths);
+            if (!ephemeralPaths.isEmpty()) {
+                for (String path : ephemeralPaths) {
+                    try {
+
+                        /**
+                         * The client is processing reconnection,
+                         * cancel remaining service registration
+                         */
+                        if (cancelKeepAlive) return;
+
+                        createEphemeral(path);
+                        failedRegistered.remove(path);
+                    } catch (Exception ignored) {
+                        /**
+                         * waiting for retry again
+                         */
+                        failedRegistered.add(path);
+                    }
+                }
+            }
+        } catch (Throwable t) {
+            logger.warn("Unexpected error, failed to recover from global lease expired or deadline exceeded.", t);
+        }
+    }
+
+    public void delete(String path) {
+        try {
+            RetryLoops.invokeWithRetry(
+                    new Callable<Void>() {
+                        @Override
+                        public Void call() throws Exception {
+                            requiredNotNull(client, failed);
+                            client.getKVClient()
+                                    .delete(ByteSequence.from(path, UTF_8))
+                                    .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS);
+                            registeredPaths.remove(path);
+                            return null;
+                        }
+                    }, retryPolicy);
+        } catch (Exception e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        } finally {
+            /**
+             * Cancel retry
+             */
+            failedRegistered.remove(path);
+        }
+    }
+
+    public String[] endPoints(String backupAddress) {
+        String[] endpoints = backupAddress.split(Constants.COMMA_SEPARATOR);
+        return Arrays.stream(endpoints)
+                .map(address -> address.indexOf(Constants.HTTP_SUBFIX_KEY) > -1
+                        ? address
+                        : Constants.HTTP_KEY + address)
+                .collect(toList())
+                .toArray(new String[0]);
+    }
+
+    /**
+     * because jetcd's connection change callback not supported yet, we must
+     * loop to test if connect or disconnect event happend or not. It will be changed
+     * in the future if we found better choice.
+     */
+    public void start() {
+        if (!started) {
+            try {
+                this.client = completableFuture.get(expirePeriod, TimeUnit.SECONDS);
+                this.connectState = isConnected();
+                this.started = true;
+            } catch (Throwable t) {
+                logger.error("Timeout! etcd3 server can not be connected in : " + expirePeriod + " seconds! url: " + url, t);
+
+                completableFuture.whenComplete((c, e) -> {
+                    this.client = c;
+                    if (e != null) {
+                        logger.error("Got an exception when trying to create etcd3 instance, can not connect to etcd3 server, url: " + url, e);
+                    }
+                });
+
+            }
+
+            try {
+                this.future = reconnectNotify.scheduleWithFixedDelay(new Runnable() {
+                    @Override
+                    public void run() {
+                        boolean connected = isConnected();
+                        if (connectState != connected) {
+                            int notifyState = connected ? StateListener.CONNECTED : StateListener.DISCONNECTED;
+                            if (connectionStateListener != null) {
+                                if (connected) {
+                                    clearKeepAlive();
+                                }
+                                connectionStateListener.stateChanged(getClient(), notifyState);
+                                cancelKeepAlive = false;
+                            }
+                            connectState = connected;
+                        }
+                    }
+
+                }, Constants.DEFAULT_REGISTRY_RECONNECT_PERIOD, Constants.DEFAULT_REGISTRY_RECONNECT_PERIOD, TimeUnit.MILLISECONDS);
+            } catch (Throwable t) {
+                logger.error("monitor reconnect status failed.", t);
+            }
+        }
+    }
+
+    private void cancelKeepAlive() {
+        try {
+            if (keepAlive != null) {
+                keepAlive.close();
+            }
+        } finally {
+            // help for gc
+            keepAlive = null;
+        }
+    }
+
+    private synchronized void clearKeepAlive() {
+        cancelKeepAlive = true;
+        registeredPaths.clear();
+        failedRegistered.clear();
+        cancelKeepAlive();
+    }
+
+    protected void doClose() {
+
+        try {
+            cancelKeepAlive = true;
+            revokeLease(this.globalLeaseId);
+        } catch (Exception e) {
+            logger.warn("revoke global lease '" + globalLeaseId + "' failed, registry: " + url, e);
+        }
+
+        try {
+            if (started && future != null) {
+                started = false;
+                future.cancel(true);
+                reconnectNotify.shutdownNow();
+            }
+        } catch (Exception e) {
+            logger.warn("stop reconnect Notify failed, registry: " + url, e);
+        }
+
+        try {
+            retryFuture.cancel(true);
+            retryExecutor.shutdownNow();
+        } catch (Throwable t) {
+            logger.warn(t.getMessage(), t);
+        }
+
+        if (getClient() != null) getClient().close();
+    }
+
+    /**
+     * try get client's shared channel, becase all fields is private on jetcd,
+     * we must using it by reflect, in the future, jetcd may provider better tools.
+     *
+     * @param client get channel from current client
+     * @return current connection channel
+     */
+    private ManagedChannel newChannel(Client client) {
+        try {
+            Field connectionField = client.getClass().getDeclaredField("connectionManager");
+            if (!connectionField.isAccessible()) {
+                connectionField.setAccessible(true);
+            }
+            Object connection = connectionField.get(client);
+            Method channel = connection.getClass().getDeclaredMethod("getChannel");
+            if (!channel.isAccessible()) {
+                channel.setAccessible(true);
+            }
+            return (ManagedChannel) channel.invoke(connection);
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to obtain connection channel from " + url.getBackupAddress(), e);
+        }
+    }
+
+    public ConnectionStateListener getConnectionStateListener() {
+        return connectionStateListener;
+    }
+
+    public void setConnectionStateListener(ConnectionStateListener connectionStateListener) {
+        this.connectionStateListener = connectionStateListener;
+    }
+
+    public static void requiredNotNull(Object obj, RuntimeException exeception) {
+        if (obj == null) {
+            throw exeception;
+        }
+    }
+
+    private void retry() {
+        if (!failedRegistered.isEmpty()) {
+            Set<String> failed = new HashSet<String>(failedRegistered);
+            if (!failed.isEmpty()) {
+
+                if (cancelKeepAlive) return;
+
+                if (logger.isWarnEnabled()) {
+                    logger.warn("Retry failed register(keep alive) for path '" + failed
+                            + "', path size: " + failed.size());
+                }
+                try {
+                    for (String path : failed) {
+                        try {
+
+                            /**
+                             * Is it currently reconnecting ?
+                             */
+                            if (cancelKeepAlive) return;
+
+                            createEphemeral(path);
+                            failedRegistered.remove(path);
+                        } catch (Throwable t) {
+                            logger.warn("Failed to retry register(keep alive) for path '" + path + "', waiting for again, cause: " + t.getMessage(), t);
+                        }
+                    }
+                } catch (Throwable t) {
+                    logger.warn("Failed to retry register(keep alive) for path '" + failed + "', waiting for again, cause: " + t.getMessage(), t);
+                }
+            }
+        }
+    }
+
+    public interface ConnectionStateListener {
+        /**
+         * Called when there is a state change in the connection
+         *
+         * @param client   the client
+         * @param newState the new state
+         */
+        public void stateChanged(Client client, int newState);
+    }
+
+    /**
+     * default request timeout
+     */
+    public static final long DEFAULT_REQUEST_TIMEOUT = obtainRequestTimeout();
+
+    public static final int DEFAULT_INBOUT_SIZE = 100 * 1024 * 1024;
+
+    public static final String GRPC_MAX_INBOUD_SIZE_KEY = "grpc.max.inbound.size";
+
+    public static final String ETCD_REQUEST_TIMEOUT_KEY = "etcd.request.timeout";
+
+    private static int obtainRequestTimeout() {
+        if (StringUtils.isNotEmpty(System.getProperty(ETCD_REQUEST_TIMEOUT_KEY))) {
+            return Integer.valueOf(System.getProperty(ETCD_REQUEST_TIMEOUT_KEY));
+        }
+        /**
+         * 10 seconds.
+         */
+        return 10 * 1000;
+    }
+}
\ No newline at end of file
diff --git a/dubbo-common/src/test/java/org/apache/dubbo/common/config/ConfigurationUtilsTest.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdTransporter.java
similarity index 51%
copy from dubbo-common/src/test/java/org/apache/dubbo/common/config/ConfigurationUtilsTest.java
copy to dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdTransporter.java
index 89da34c..5ddec8e 100644
--- a/dubbo-common/src/test/java/org/apache/dubbo/common/config/ConfigurationUtilsTest.java
+++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdTransporter.java
@@ -14,29 +14,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.dubbo.common.config;
+package org.apache.dubbo.remoting.etcd.jetcd;
 
-import org.apache.dubbo.common.Constants;
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.remoting.etcd.EtcdClient;
+import org.apache.dubbo.remoting.etcd.EtcdTransporter;
 
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
+public class JEtcdTransporter implements EtcdTransporter {
 
-/**
- *
- */
-public class ConfigurationUtilsTest {
-
-    @Test
-    public void testGetServerShutdownTimeout () {
-        System.setProperty(Constants.SHUTDOWN_WAIT_KEY, " 10000");
-        Assertions.assertEquals(10000, ConfigurationUtils.getServerShutdownTimeout());
-        System.clearProperty(Constants.SHUTDOWN_WAIT_KEY);
+    @Override
+    public EtcdClient connect(URL url) {
+        return new JEtcdClient(url);
     }
 
-    @Test
-    public void testGetProperty () {
-        System.setProperty(Constants.SHUTDOWN_WAIT_KEY, " 10000");
-        Assertions.assertEquals("10000", ConfigurationUtils.getProperty(Constants.SHUTDOWN_WAIT_KEY));
-        System.clearProperty(Constants.SHUTDOWN_WAIT_KEY);
-    }
 }
diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/RetryLoops.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/RetryLoops.java
new file mode 100644
index 0000000..cf8617c
--- /dev/null
+++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/RetryLoops.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.etcd.jetcd;
+
+import io.grpc.Status;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.remoting.etcd.RetryPolicy;
+import org.apache.dubbo.remoting.etcd.option.OptionUtil;
+
+import java.util.concurrent.Callable;
+
+public class RetryLoops {
+
+    private final long startTimeMs = System.currentTimeMillis();
+    private boolean isDone = false;
+    private int retriedCount = 0;
+    private Logger logger = LoggerFactory.getLogger(RetryLoops.class);
+
+    public static <R> R invokeWithRetry(Callable<R> task, RetryPolicy retryPolicy) throws Exception {
+        R result = null;
+        RetryLoops retryLoop = new RetryLoops();
+        while (retryLoop.shouldContinue()) {
+            try {
+                result = task.call();
+                retryLoop.complete();
+            } catch (Exception e) {
+                retryLoop.fireException(e, retryPolicy);
+            }
+        }
+        return result;
+    }
+
+    public void fireException(Exception e, RetryPolicy retryPolicy) throws Exception {
+
+        if (e instanceof InterruptedException) Thread.currentThread().interrupt();
+
+        boolean rethrow = true;
+        if (isRetryException(e)
+                && retryPolicy.shouldRetry(retriedCount++, System.currentTimeMillis() - startTimeMs, true)) {
+            rethrow = false;
+        }
+
+        if (rethrow) {
+            throw e;
+        }
+    }
+
+    private boolean isRetryException(Throwable e) {
+        Status status = Status.fromThrowable(e);
+        if (OptionUtil.isRecoverable(status)) return true;
+
+        return false;
+    }
+
+    public boolean shouldContinue() {
+        return !isDone;
+    }
+
+    public void complete() {
+        isDone = true;
+    }
+
+}
diff --git a/dubbo-common/src/test/java/org/apache/dubbo/common/config/ConfigurationUtilsTest.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/RetryNTimes.java
similarity index 52%
copy from dubbo-common/src/test/java/org/apache/dubbo/common/config/ConfigurationUtilsTest.java
copy to dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/RetryNTimes.java
index 89da34c..7453208 100644
--- a/dubbo-common/src/test/java/org/apache/dubbo/common/config/ConfigurationUtilsTest.java
+++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/RetryNTimes.java
@@ -14,29 +14,23 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.dubbo.common.config;
+package org.apache.dubbo.remoting.etcd.jetcd;
 
-import org.apache.dubbo.common.Constants;
+import org.apache.dubbo.remoting.etcd.AbstractRetryPolicy;
 
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
+import java.util.concurrent.TimeUnit;
 
-/**
- *
- */
-public class ConfigurationUtilsTest {
+public class RetryNTimes extends AbstractRetryPolicy {
+
+    private final long sleepMilliseconds;
 
-    @Test
-    public void testGetServerShutdownTimeout () {
-        System.setProperty(Constants.SHUTDOWN_WAIT_KEY, " 10000");
-        Assertions.assertEquals(10000, ConfigurationUtils.getServerShutdownTimeout());
-        System.clearProperty(Constants.SHUTDOWN_WAIT_KEY);
+    public RetryNTimes(int maxRetried, int sleepTime, TimeUnit unit) {
+        super(maxRetried);
+        this.sleepMilliseconds = unit.convert(sleepTime, TimeUnit.MILLISECONDS);
     }
 
-    @Test
-    public void testGetProperty () {
-        System.setProperty(Constants.SHUTDOWN_WAIT_KEY, " 10000");
-        Assertions.assertEquals("10000", ConfigurationUtils.getProperty(Constants.SHUTDOWN_WAIT_KEY));
-        System.clearProperty(Constants.SHUTDOWN_WAIT_KEY);
+    @Override
+    protected long getSleepTime(int retried, long elapsed) {
+        return sleepMilliseconds;
     }
 }
diff --git a/dubbo-common/src/test/java/org/apache/dubbo/common/config/ConfigurationUtilsTest.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/option/Constants.java
similarity index 51%
copy from dubbo-common/src/test/java/org/apache/dubbo/common/config/ConfigurationUtilsTest.java
copy to dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/option/Constants.java
index 89da34c..c935808 100644
--- a/dubbo-common/src/test/java/org/apache/dubbo/common/config/ConfigurationUtilsTest.java
+++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/option/Constants.java
@@ -14,29 +14,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.dubbo.common.config;
-
-import org.apache.dubbo.common.Constants;
-
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
+package org.apache.dubbo.remoting.etcd.option;
 
 /**
- *
+ * Etcd registry constants.
  */
-public class ConfigurationUtilsTest {
+public class Constants extends org.apache.dubbo.common.Constants {
+
+    public static final String HTTP_SUBFIX_KEY = "://";
+
+    public static final String HTTP_KEY = "http://";
 
-    @Test
-    public void testGetServerShutdownTimeout () {
-        System.setProperty(Constants.SHUTDOWN_WAIT_KEY, " 10000");
-        Assertions.assertEquals(10000, ConfigurationUtils.getServerShutdownTimeout());
-        System.clearProperty(Constants.SHUTDOWN_WAIT_KEY);
-    }
+    public static final int DEFAULT_KEEPALIVE_TIMEOUT = DEFAULT_SESSION_TIMEOUT / 2;
 
-    @Test
-    public void testGetProperty () {
-        System.setProperty(Constants.SHUTDOWN_WAIT_KEY, " 10000");
-        Assertions.assertEquals("10000", ConfigurationUtils.getProperty(Constants.SHUTDOWN_WAIT_KEY));
-        System.clearProperty(Constants.SHUTDOWN_WAIT_KEY);
-    }
 }
diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/option/OptionUtil.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/option/OptionUtil.java
new file mode 100644
index 0000000..609f289
--- /dev/null
+++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/option/OptionUtil.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.etcd.option;
+
+import io.etcd.jetcd.ByteSequence;
+import io.grpc.Status;
+import io.netty.handler.codec.http2.Http2Exception;
+
+import java.util.Arrays;
+
+public class OptionUtil {
+
+    public static final byte[] NO_PREFIX_END = {0};
+
+    public static final ByteSequence prefixEndOf(ByteSequence prefix) {
+        byte[] endKey = prefix.getBytes().clone();
+        for (int i = endKey.length - 1; i >= 0; i--) {
+            if (endKey[i] < 0xff) {
+                endKey[i] = (byte) (endKey[i] + 1);
+                return ByteSequence.from(Arrays.copyOf(endKey, i + 1));
+            }
+        }
+
+        return ByteSequence.from(NO_PREFIX_END);
+    }
+
+    public static boolean isRecoverable(Status status) {
+        return isHaltError(status)
+                || isNoLeaderError(status)
+                // ephemeral is expired
+                || status.getCode() == Status.Code.NOT_FOUND;
+    }
+
+    public static boolean isHaltError(Status status) {
+        // Unavailable codes mean the system will be right back.
+        // (e.g., can't connect, lost leader)
+        // Treat Internal codes as if something failed, leaving the
+        // system in an inconsistent state, but retrying could make progress.
+        // (e.g., failed in middle of send, corrupted frame)
+        return status.getCode() != Status.Code.UNAVAILABLE && status.getCode() != Status.Code.INTERNAL;
+    }
+
+    public static boolean isNoLeaderError(Status status) {
+        return status.getCode() == Status.Code.UNAVAILABLE
+                && "etcdserver: no leader".equals(status.getDescription());
+    }
+
+    public static boolean isProtocolError(Throwable e) {
+        if (e == null) return false;
+        Throwable cause = e.getCause();
+        while (cause != null) {
+            if (cause instanceof Http2Exception) {
+                Http2Exception t = (Http2Exception) cause;
+                if ("PROTOCOL_ERROR".equals(t.error().name())) {
+                    return true;
+                }
+            }
+            cause = cause.getCause();
+        }
+        return false;
+    }
+}
diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/support/AbstractEtcdClient.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/support/AbstractEtcdClient.java
new file mode 100644
index 0000000..31752bf
--- /dev/null
+++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/support/AbstractEtcdClient.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.etcd.support;
+
+import org.apache.dubbo.common.Constants;
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.ConcurrentHashSet;
+import org.apache.dubbo.common.utils.StringUtils;
+import org.apache.dubbo.remoting.etcd.ChildListener;
+import org.apache.dubbo.remoting.etcd.EtcdClient;
+import org.apache.dubbo.remoting.etcd.StateListener;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+public abstract class AbstractEtcdClient<WatcherListener> implements EtcdClient {
+
+    protected static final Logger logger = LoggerFactory.getLogger(AbstractEtcdClient.class);
+
+    private final URL url;
+
+    private final Set<StateListener> stateListeners = new ConcurrentHashSet<>();
+
+    private final ConcurrentMap<String, ConcurrentMap<ChildListener, WatcherListener>> childListeners = new ConcurrentHashMap<String, ConcurrentMap<ChildListener, WatcherListener>>();
+    private final List<String> categroies = Arrays.asList(Constants.PROVIDERS_CATEGORY
+            , Constants.CONSUMERS_CATEGORY
+            , Constants.ROUTERS_CATEGORY
+            , Constants.CONFIGURATORS_CATEGORY);
+    private volatile boolean closed = false;
+
+    public AbstractEtcdClient(URL url) {
+        this.url = url;
+    }
+
+    public URL getUrl() {
+        return url;
+    }
+
+    public void create(String path) {
+        String fixedPath = fixNamespace(path);
+        createParentIfAbsent(fixedPath);
+        doCreatePersistent(fixedPath);
+    }
+
+    public long createEphemeral(String path) {
+        String fixedPath = fixNamespace(path);
+        createParentIfAbsent(fixedPath);
+        return doCreateEphemeral(path);
+    }
+
+    public void addStateListener(StateListener listener) {
+        stateListeners.add(listener);
+    }
+
+    public void removeStateListener(StateListener listener) {
+        stateListeners.remove(listener);
+    }
+
+    public Set<StateListener> getSessionListeners() {
+        return stateListeners;
+    }
+
+    public List<String> addChildListener(String path, final ChildListener listener) {
+        ConcurrentMap<ChildListener, WatcherListener> listeners = childListeners.get(path);
+        if (listeners == null) {
+            childListeners.putIfAbsent(path, new ConcurrentHashMap<ChildListener, WatcherListener>());
+            listeners = childListeners.get(path);
+        }
+        WatcherListener targetListener = listeners.get(listener);
+        if (targetListener == null) {
+            listeners.putIfAbsent(listener, createChildWatcherListener(path, listener));
+            targetListener = listeners.get(listener);
+        }
+        return addChildWatcherListener(path, targetListener);
+    }
+
+    public WatcherListener getChildListener(String path, ChildListener listener) {
+        ConcurrentMap<ChildListener, WatcherListener> listeners = childListeners.get(path);
+        if (listeners == null) {
+            return null;
+        }
+        WatcherListener targetListener = listeners.get(listener);
+        if (targetListener == null) {
+            listeners.putIfAbsent(listener, createChildWatcherListener(path, listener));
+            targetListener = listeners.get(listener);
+        }
+        return targetListener;
+    }
+
+    public void removeChildListener(String path, ChildListener listener) {
+        ConcurrentMap<ChildListener, WatcherListener> listeners = childListeners.get(path);
+        if (listeners != null) {
+            WatcherListener targetListener = listeners.remove(listener);
+            if (targetListener != null) {
+                removeChildWatcherListener(path, targetListener);
+            }
+        }
+    }
+
+    protected void stateChanged(int state) {
+        for (StateListener sessionListener : getSessionListeners()) {
+            sessionListener.stateChanged(state);
+        }
+    }
+
+    protected String fixNamespace(String path) {
+        if (StringUtils.isEmpty(path)) {
+            throw new IllegalArgumentException("path is required, actual null or ''");
+        }
+        return (path.charAt(0) != '/') ? (Constants.PATH_SEPARATOR + path) : path;
+    }
+
+    protected void createParentIfAbsent(String fixedPath) {
+        int i = fixedPath.lastIndexOf('/');
+        if (i > 0) {
+            String parentPath = fixedPath.substring(0, i);
+            if (categroies.stream().anyMatch(c -> fixedPath.endsWith(c))) {
+                if (!checkExists(parentPath)) {
+                    this.doCreatePersistent(parentPath);
+                }
+            } else if (categroies.stream().anyMatch(c -> parentPath.endsWith(c))) {
+                String grandfather = parentPath.substring(0, parentPath.lastIndexOf('/'));
+                if (!checkExists(grandfather)) {
+                    this.doCreatePersistent(grandfather);
+                }
+            }
+        }
+    }
+
+    public void close() {
+        if (closed) {
+            return;
+        }
+        closed = true;
+        try {
+            doClose();
+        } catch (Throwable t) {
+            logger.warn(t.getMessage(), t);
+        }
+    }
+
+    public abstract void doClose();
+
+    public abstract void doCreatePersistent(String path);
+
+    public abstract long doCreateEphemeral(String path);
+
+    public abstract void delete(String path);
+
+    public abstract boolean checkExists(String path);
+
+    public abstract WatcherListener createChildWatcherListener(String path, ChildListener listener);
+
+    public abstract List<String> addChildWatcherListener(String path, WatcherListener listener);
+
+    public abstract void removeChildWatcherListener(String path, WatcherListener listener);
+
+}
diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.etcd.EtcdTransporter b/dubbo-remoting/dubbo-remoting-etcd3/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.etcd.EtcdTransporter
new file mode 100644
index 0000000..d10733a
--- /dev/null
+++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.etcd.EtcdTransporter
@@ -0,0 +1 @@
+jetcd=org.apache.dubbo.remoting.etcd.jetcd.JEtcdTransporter
\ No newline at end of file
diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/test/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientTest.java b/dubbo-remoting/dubbo-remoting-etcd3/src/test/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientTest.java
new file mode 100644
index 0000000..19254ab
--- /dev/null
+++ b/dubbo-remoting/dubbo-remoting-etcd3/src/test/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientTest.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.etcd.jetcd;
+
+import io.etcd.jetcd.common.exception.ClosedClientException;
+import io.grpc.Status;
+import org.apache.dubbo.common.Constants;
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.remoting.etcd.ChildListener;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+@Disabled
+public class JEtcdClientTest {
+
+    JEtcdClient client;
+
+    @Test
+    public void test_watch_when_create_path() throws InterruptedException {
+
+        String path = "/dubbo/com.alibaba.dubbo.demo.DemoService/providers";
+        String child = "/dubbo/com.alibaba.dubbo.demo.DemoService/providers/demoService1";
+
+        final CountDownLatch notNotified = new CountDownLatch(1);
+
+        ChildListener childListener = (parent, children) -> {
+            Assertions.assertEquals(1, children.size());
+            Assertions.assertEquals(child.substring(child.lastIndexOf("/") + 1), children.get(0));
+            notNotified.countDown();
+        };
+
+        client.addChildListener(path, childListener);
+
+        client.createEphemeral(child);
+        Assertions.assertTrue(notNotified.await(10, TimeUnit.SECONDS));
+
+        client.removeChildListener(path, childListener);
+        client.delete(child);
+    }
+
+    @Test
+    public void test_watch_when_create_wrong_path() throws InterruptedException {
+
+        String path = "/dubbo/com.alibaba.dubbo.demo.DemoService/providers";
+        String child = "/dubbo/com.alibaba.dubbo.demo.DemoService/routers/demoService1";
+
+        final CountDownLatch notNotified = new CountDownLatch(1);
+
+        ChildListener childListener = (parent, children) -> {
+            Assertions.assertEquals(1, children.size());
+            Assertions.assertEquals(child, children.get(0));
+            notNotified.countDown();
+        };
+
+        client.addChildListener(path, childListener);
+
+        client.createEphemeral(child);
+        Assertions.assertFalse(notNotified.await(1, TimeUnit.SECONDS));
+
+        client.removeChildListener(path, childListener);
+        client.delete(child);
+    }
+
+    @Test
+    public void test_watch_when_delete_path() throws InterruptedException {
+
+        String path = "/dubbo/com.alibaba.dubbo.demo.DemoService/providers";
+        String child = "/dubbo/com.alibaba.dubbo.demo.DemoService/providers/demoService1";
+
+        final CountDownLatch notNotified = new CountDownLatch(1);
+
+        ChildListener childListener = (parent, children) -> {
+            Assertions.assertEquals(0, children.size());
+            notNotified.countDown();
+        };
+
+        client.createEphemeral(child);
+
+        client.addChildListener(path, childListener);
+        client.delete(child);
+
+        Assertions.assertTrue(notNotified.await(10, TimeUnit.SECONDS));
+        client.removeChildListener(path, childListener);
+    }
+
+    @Test
+    public void test_watch_then_unwatch() throws InterruptedException {
+
+        String path = "/dubbo/com.alibaba.dubbo.demo.DemoService/providers";
+        String child = "/dubbo/com.alibaba.dubbo.demo.DemoService/providers/demoService2";
+
+        final CountDownLatch notNotified = new CountDownLatch(1);
+        final CountDownLatch notTwiceNotified = new CountDownLatch(2);
+
+        final Holder notified = new Holder();
+
+        ChildListener childListener = (parent, children) -> {
+            Assertions.assertEquals(1, children.size());
+            Assertions.assertEquals(child.substring(child.lastIndexOf("/") + 1), children.get(0));
+            notNotified.countDown();
+            notTwiceNotified.countDown();
+            notified.getAndIncrease();
+        };
+
+        client.addChildListener(path, childListener);
+
+        client.createEphemeral(child);
+        Assertions.assertTrue(notNotified.await(15, TimeUnit.SECONDS));
+
+        client.removeChildListener(path, childListener);
+        client.delete(child);
+
+        Assertions.assertFalse(notTwiceNotified.await(5, TimeUnit.SECONDS));
+        Assertions.assertEquals(1, notified.value);
+        client.delete(child);
+    }
+
+    @Test
+    public void test_watch_on_unrecoverable_connection() throws InterruptedException {
+
+        String path = "/dubbo/com.alibaba.dubbo.demo.DemoService/providers";
+        JEtcdClient.EtcdWatcher watcher = null;
+        try {
+            ChildListener childListener = (parent, children) -> {
+                Assertions.assertEquals(path, parent);
+            };
+            client.addChildListener(path, childListener);
+            watcher = client.getChildListener(path, childListener);
+            watcher.watchRequest.onError(Status.ABORTED.withDescription("connection error").asRuntimeException());
+
+            watcher.watchRequest.onNext(watcher.nextRequest());
+        } catch (Exception e) {
+            Assertions.assertTrue(e.getMessage().contains("call was cancelled"));
+        }
+    }
+
+    @Test
+    public void test_watch_on_recoverable_connection() throws InterruptedException {
+
+        String path = "/dubbo/com.alibaba.dubbo.demo.DemoService/connection";
+        String child = "/dubbo/com.alibaba.dubbo.demo.DemoService/connection/demoService1";
+
+        final CountDownLatch notNotified = new CountDownLatch(1);
+        final CountDownLatch notTwiceNotified = new CountDownLatch(2);
+        final Holder notified = new Holder();
+        ChildListener childListener = (parent, children) -> {
+            notTwiceNotified.countDown();
+            switch (notified.increaseAndGet()) {
+                case 1: {
+                    notNotified.countDown();
+                    Assertions.assertTrue(children.size() == 1);
+                    Assertions.assertEquals(child.substring(child.lastIndexOf("/") + 1), children.get(0));
+                    break;
+                }
+                case 2: {
+                    Assertions.assertTrue(children.size() == 0);
+                    Assertions.assertEquals(path, parent);
+                    break;
+                }
+                default:
+                    Assertions.fail("two many callback invoked.");
+            }
+        };
+
+        client.addChildListener(path, childListener);
+        client.createEphemeral(child);
+
+        // make sure first time callback successfully
+        Assertions.assertTrue(notNotified.await(15, TimeUnit.SECONDS));
+
+        // connection error causes client to release all resources including current watcher
+        JEtcdClient.EtcdWatcher watcher = client.getChildListener(path, childListener);
+        watcher.onError(Status.UNAVAILABLE.withDescription("temporary connection issue").asRuntimeException());
+
+        // trigger delete after unavailable
+        client.delete(child);
+        Assertions.assertTrue(notTwiceNotified.await(15, TimeUnit.SECONDS));
+
+        client.removeChildListener(path, childListener);
+    }
+
+    @Test
+    public void test_watch_after_client_closed() throws InterruptedException {
+
+        String path = "/dubbo/com.alibaba.dubbo.demo.DemoService/providers";
+        client.close();
+
+        try {
+            client.addChildListener(path, (parent, children) -> {
+                Assertions.assertEquals(path, parent);
+            });
+        } catch (ClosedClientException e) {
+            Assertions.assertEquals("watch client has been closed, path '" + path + "'", e.getMessage());
+        }
+    }
+
+    @BeforeEach
+    public void setUp() {
+        // timeout in 15 seconds.
+        URL url = URL.valueOf("etcd3://127.0.0.1:2379/com.alibaba.dubbo.registry.RegistryService")
+                .addParameter(Constants.SESSION_TIMEOUT_KEY, 15000);
+
+        client = new JEtcdClient(url);
+    }
+
+    @AfterEach
+    public void tearDown() {
+        client.close();
+    }
+
+    static class Holder {
+
+        volatile int value;
+
+        synchronized int getAndIncrease() {
+            return value++;
+        }
+
+        synchronized int increaseAndGet() {
+            return ++value;
+        }
+    }
+}
diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/test/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapperTest.java b/dubbo-remoting/dubbo-remoting-etcd3/src/test/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapperTest.java
new file mode 100644
index 0000000..b7d2671
--- /dev/null
+++ b/dubbo-remoting/dubbo-remoting-etcd3/src/test/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapperTest.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.etcd.jetcd;
+
+import org.apache.dubbo.common.Constants;
+import org.apache.dubbo.common.URL;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.LockSupport;
+
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+
+@Disabled
+public class JEtcdClientWrapperTest {
+
+    JEtcdClientWrapper clientWrapper;
+
+    @Test
+    public void test_path_exists() {
+        String path = "/dubbo/org.apache.dubbo.demo.DemoService/providers";
+        clientWrapper.createPersistent(path);
+        Assertions.assertTrue(clientWrapper.checkExists(path));
+        Assertions.assertFalse(clientWrapper.checkExists(path + "/noneexits"));
+        clientWrapper.delete(path);
+    }
+
+    @Test
+    public void test_create_emerphal_path() {
+        String path = "/dubbo/org.apache.dubbo.demo.DemoService/providers";
+        clientWrapper.createEphemeral(path);
+        Assertions.assertTrue(clientWrapper.checkExists(path));
+        clientWrapper.delete(path);
+    }
+
+    @Test
+    public void test_grant_lease_then_revoke() {
+        long lease = clientWrapper.createLease(1);
+        clientWrapper.revokeLease(lease);
+
+        long newLease = clientWrapper.createLease(1);
+        LockSupport.parkNanos(this, TimeUnit.SECONDS.toNanos(2));
+        // test timeout of lease
+        clientWrapper.revokeLease(newLease);
+    }
+
+    @Test
+    public void test_create_emerphal_path_then_timeout() {
+        String path = "/dubbo/org.apache.dubbo.demo.DemoService/providers";
+
+        URL url = URL.valueOf("etcd3://127.0.0.1:2379/org.apache.dubbo.registry.RegistryService")
+                .addParameter(Constants.SESSION_TIMEOUT_KEY, 1000);
+
+        JEtcdClientWrapper saved = clientWrapper;
+
+        try {
+            clientWrapper = spy(new JEtcdClientWrapper(url));
+            clientWrapper.start();
+
+            doAnswer(new Answer() {
+                int timeout;
+
+                @Override
+                public Object answer(InvocationOnMock invocation) throws Throwable {
+                    LockSupport.parkNanos(this, TimeUnit.SECONDS.toNanos(2));
+                    if (timeout++ > 0) {
+                        throw new TimeoutException();
+                    }
+                    return null;
+                }
+            }).when(clientWrapper).keepAlive(anyLong());
+
+            try {
+                clientWrapper.createEphemeral(path);
+            } catch (IllegalStateException ex) {
+                Assertions.assertEquals("failed to create ephereral by path '" + path + "'", ex.getMessage());
+            }
+
+        } finally {
+            clientWrapper.doClose();
+            clientWrapper = saved;
+        }
+    }
+
+    @Test
+    public void test_get_emerphal_children_path() {
+        String path = "/dubbo/org.apache.dubbo.demo.DemoService/providers";
+        String[] children = {
+                "/dubbo/org.apache.dubbo.demo.DemoService/providers/service1"
+                , "/dubbo/org.apache.dubbo.demo.DemoService/providers/service2"
+                , "/dubbo/org.apache.dubbo.demo.DemoService/providers/service3"
+                , "/dubbo/org.apache.dubbo.demo.DemoService/providers/service4"
+                , "/dubbo/org.apache.dubbo.demo.DemoService/providers/service5/exclude"
+        };
+
+        Arrays.stream(children).forEach((child) -> {
+            Assertions.assertFalse(clientWrapper.checkExists(child));
+            clientWrapper.createEphemeral(child);
+        });
+
+        List<String> extected = clientWrapper.getChildren(path);
+
+        Assertions.assertEquals(4, extected.size());
+        extected.stream().forEach((child) -> {
+            boolean found = false;
+            for (int i = 0; i < children.length; ++i) {
+                if (child.equals(children[i])) {
+                    found = true;
+                    break;
+                }
+            }
+            Assertions.assertTrue(found);
+            clientWrapper.delete(child);
+        });
+    }
+
+    @Test
+    public void test_connect_cluster() {
+        URL url = URL.valueOf("etcd3://127.0.0.1:22379/org.apache.dubbo.registry.RegistryService?backup=127.0.0.1:2379,127.0.0.1:32379");
+        JEtcdClientWrapper clientWrapper = new JEtcdClientWrapper(url);
+        try {
+            clientWrapper.start();
+            String path = "/dubbo/org.apache.dubbo.demo.DemoService/providers";
+            clientWrapper.createEphemeral(path);
+            Assertions.assertTrue(clientWrapper.checkExists(path));
+            Assertions.assertFalse(clientWrapper.checkExists(path + "/noneexits"));
+            clientWrapper.delete(path);
+        } finally {
+            clientWrapper.doClose();
+        }
+    }
+
+    @BeforeEach
+    public void setUp() {
+        URL url = URL.valueOf("etcd3://127.0.0.1:2379/org.apache.dubbo.registry.RegistryService");
+        clientWrapper = new JEtcdClientWrapper(url);
+        clientWrapper.start();
+    }
+
+    @AfterEach
+    public void tearDown() {
+        clientWrapper.doClose();
+    }
+}
diff --git a/dubbo-remoting/pom.xml b/dubbo-remoting/pom.xml
index d646c22..fd17dbf 100644
--- a/dubbo-remoting/pom.xml
+++ b/dubbo-remoting/pom.xml
@@ -38,5 +38,6 @@
         <module>dubbo-remoting-http</module>
         <module>dubbo-remoting-zookeeper</module>
         <module>dubbo-remoting-netty4</module>
+        <module>dubbo-remoting-etcd3</module>
     </modules>
 </project>
\ No newline at end of file