You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by al...@apache.org on 2022/07/21 09:28:39 UTC

[dubbo-spi-extensions] branch master updated: Enhance registry module (#134)

This is an automated email from the ASF dual-hosted git repository.

albumenj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo-spi-extensions.git


The following commit(s) were added to refs/heads/master by this push:
     new 04fc377  Enhance registry module (#134)
04fc377 is described below

commit 04fc3772f807000e20a955048270bac7794dd7d1
Author: Albumen Kevin <jh...@gmail.com>
AuthorDate: Thu Jul 21 17:28:35 2022 +0800

    Enhance registry module (#134)
    
    * add consul registry
    
    * add etcd3 registry
    
    * add eureka registry
    
    * add redis registry
    
    * add sofa registry
    
    * opt pom
    
    * try adapt consul
    
    * fix consul
    
    * try adapt etcd
    
    * try tp adapt sofa
    
    * remove eureka
    
    * fix consul
    
    * fix compile
    
    * fix checkstyle
---
 dubbo-extensions-dependencies-bom/pom.xml          |  56 +++
 .../dubbo-registry-consul/pom.xml                  |  63 +++
 .../registry/consul/AbstractConsulRegistry.java    |  35 ++
 .../dubbo/registry/consul/ConsulConstants.java     |  34 ++
 .../dubbo/registry/consul/ConsulParameter.java     |  87 ++++
 .../dubbo/registry/consul/ConsulRegistry.java      | 384 +++++++++++++++
 .../registry/consul/ConsulRegistryFactory.java     |  32 ++
 .../registry/consul/ConsulServiceDiscovery.java    | 469 +++++++++++++++++++
 .../consul/ConsulServiceDiscoveryFactory.java      |  30 ++
 .../org.apache.dubbo.registry.RegistryFactory      |   1 +
 ...e.dubbo.registry.client.ServiceDiscoveryFactory |   1 +
 .../dubbo/registry/consul/ConsulRegistryTest.java  | 151 ++++++
 .../consul/ConsulServiceDiscoveryTest.java         | 149 ++++++
 .../dubbo-registry-etcd3/pom.xml                   |  51 ++
 .../apache/dubbo/registry/etcd/EtcdRegistry.java   | 365 +++++++++++++++
 .../dubbo/registry/etcd/EtcdRegistryFactory.java   |  36 ++
 .../dubbo/registry/etcd/EtcdServiceDiscovery.java  | 240 ++++++++++
 .../registry/etcd/EtcdServiceDiscoveryFactory.java |  30 ++
 .../org.apache.dubbo.registry.RegistryFactory      |   1 +
 ...e.dubbo.registry.client.ServiceDiscoveryFactory |   1 +
 .../dubbo/registry/etcd/EtcdRegistryTest.java      | 329 +++++++++++++
 .../registry/etcd/EtcdServiceDiscoveryTest.java    | 124 +++++
 .../dubbo-registry-redis/pom.xml                   |  59 +++
 .../apache/dubbo/registry/redis/RedisRegistry.java | 519 +++++++++++++++++++++
 .../dubbo/registry/redis/RedisRegistryFactory.java |  34 ++
 .../org.apache.dubbo.registry.RegistryFactory      |   1 +
 .../dubbo/registry/redis/RedisRegistryTest.java    | 250 ++++++++++
 .../dubbo-registry-sofa/pom.xml                    | 130 ++++++
 .../apache/dubbo/registry/sofa/SofaRegistry.java   | 295 ++++++++++++
 .../dubbo/registry/sofa/SofaRegistryConstants.java |  43 ++
 .../dubbo/registry/sofa/SofaRegistryFactory.java   |  41 ++
 .../dubbo/registry/sofa/SofaRegistryInstance.java  |  90 ++++
 .../sofa/SofaRegistryServiceDiscovery.java         | 301 ++++++++++++
 .../sofa/SofaRegistryServiceDiscoveryFactory.java  |  28 ++
 .../org.apache.dubbo.registry.RegistryFactory      |   1 +
 ...g.apache.dubbo.registry.client.ServiceDiscovery |   1 +
 ...e.dubbo.registry.client.ServiceDiscoveryFactory |   1 +
 .../apache/dubbo/registry/sofa/HelloService.java   |  24 +
 .../dubbo/registry/sofa/HelloServiceImpl.java      |  44 ++
 .../src/test/resources/log4j.properties            |   7 +
 dubbo-registry-extensions/pom.xml                  |   4 +
 41 files changed, 4542 insertions(+)

diff --git a/dubbo-extensions-dependencies-bom/pom.xml b/dubbo-extensions-dependencies-bom/pom.xml
index 41a1230..346535a 100644
--- a/dubbo-extensions-dependencies-bom/pom.xml
+++ b/dubbo-extensions-dependencies-bom/pom.xml
@@ -128,6 +128,11 @@
         <consul_client_version>1.3.7</consul_client_version>
         <test_container_version>1.15.2</test_container_version>
         <seata_version>1.5.2</seata_version>
+        <eureka.version>1.9.12</eureka.version>
+        <sofa_registry_version>5.2.0</sofa_registry_version>
+        <logback_version>1.2.11</logback_version>
+        <rs_api_version>2.0</rs_api_version>
+        <resteasy_version>3.0.19.Final</resteasy_version>
 
         <maven_flatten_version>1.2.5</maven_flatten_version>
     </properties>
@@ -427,6 +432,57 @@
                 <artifactId>seata-core</artifactId>
                 <version>${seata_version}</version>
             </dependency>
+            <dependency>
+                <groupId>com.netflix.eureka</groupId>
+                <artifactId>eureka-client</artifactId>
+                <version>${eureka.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>com.netflix.eureka</groupId>
+                <artifactId>eureka-core</artifactId>
+                <version>${eureka.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>com.alipay.sofa</groupId>
+                <artifactId>registry-client-all</artifactId>
+                <version>${sofa_registry_version}</version>
+            </dependency>
+            <dependency>
+                <groupId>com.alipay.sofa</groupId>
+                <artifactId>registry-test</artifactId>
+                <version>${sofa_registry_version}</version>
+                <scope>test</scope>
+            </dependency>
+            <dependency>
+                <groupId>ch.qos.logback</groupId>
+                <artifactId>logback-classic</artifactId>
+                <version>${logback_version}</version>
+            </dependency>
+            <dependency>
+                <groupId>javax.ws.rs</groupId>
+                <artifactId>javax.ws.rs-api</artifactId>
+                <version>${rs_api_version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.jboss.resteasy</groupId>
+                <artifactId>resteasy-jaxrs</artifactId>
+                <version>${resteasy_version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.jboss.resteasy</groupId>
+                <artifactId>resteasy-client</artifactId>
+                <version>${resteasy_version}</version>
+            </dependency>
+            <dependency>
+            <groupId>org.jboss.resteasy</groupId>
+            <artifactId>resteasy-netty4</artifactId>
+            <version>${resteasy_version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>io.netty</groupId>
+                    <artifactId>netty-all</artifactId>
+                </exclusion>
+            </exclusions>
         </dependencies>
     </dependencyManagement>
 
diff --git a/dubbo-registry-extensions/dubbo-registry-consul/pom.xml b/dubbo-registry-extensions/dubbo-registry-consul/pom.xml
new file mode 100644
index 0000000..8674ed4
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-consul/pom.xml
@@ -0,0 +1,63 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>dubbo-registry-extensions</artifactId>
+        <groupId>org.apache.dubbo.extensions</groupId>
+        <version>${revision}</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <version>1.0.0-SNAPSHOT</version>
+    <artifactId>dubbo-registry-consul</artifactId>
+
+    <properties>
+        <skipIntegrationTests>true</skipIntegrationTests>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.dubbo</groupId>
+            <artifactId>dubbo-registry-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.ecwid.consul</groupId>
+            <artifactId>consul-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.pszymczyk.consul</groupId>
+            <artifactId>embedded-consul</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <configuration>
+                    <skipTests>${skipIntegrationTests}</skipTests>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
diff --git a/dubbo-registry-extensions/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/AbstractConsulRegistry.java b/dubbo-registry-extensions/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/AbstractConsulRegistry.java
new file mode 100644
index 0000000..f18bc45
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/AbstractConsulRegistry.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.consul;
+
+/**
+ * @author cvictory ON 2019-08-02
+ */
+public class AbstractConsulRegistry {
+
+    static final String SERVICE_TAG = "dubbo";
+    static final String URL_META_KEY = "url";
+    static final String CHECK_PASS_INTERVAL = "consul-check-pass-interval";
+    static final String DEREGISTER_AFTER = "consul-deregister-critical-service-after";
+
+    static final long DEFAULT_CHECK_PASS_INTERVAL = 16000L;
+    // default deregister critical server after
+    static final String DEFAULT_DEREGISTER_TIME = "20s";
+
+    static final int PERIOD_DENOMINATOR = 8;
+    static final int ONE_THOUSAND = 1000;
+}
diff --git a/dubbo-registry-extensions/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulConstants.java b/dubbo-registry-extensions/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulConstants.java
new file mode 100644
index 0000000..1976f30
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulConstants.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.registry.consul;
+
+/**
+ * Common configuration for configCenter, metadata, and registry modules
+ */
+public interface ConsulConstants {
+
+    int DEFAULT_PORT = 8500;
+
+    int DEFAULT_WATCH_TIMEOUT = 60 * 1000;
+
+    String WATCH_TIMEOUT = "consul-watch-timeout";
+
+    int INVALID_PORT = 0;
+
+    String TOKEN = "token";
+}
diff --git a/dubbo-registry-extensions/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulParameter.java b/dubbo-registry-extensions/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulParameter.java
new file mode 100644
index 0000000..f3fb024
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulParameter.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.consul;
+
+import org.apache.dubbo.common.URL;
+
+import static org.apache.dubbo.common.utils.StringUtils.isBlank;
+
+/**
+ * The enumeration for the Consul's parameters on the {@link URL}
+ *
+ * @see URL#getParameters()
+ * @since 2.7.8
+ */
+public enum ConsulParameter {
+
+    ACL_TOKEN,
+
+    TAGS,
+
+    INSTANCE_ZONE,
+
+    DEFAULT_ZONE_METADATA_NAME("zone"),
+
+    INSTANCE_GROUP,
+
+    CONSISTENCY_MODE,
+
+    ;
+
+    private final String name;
+
+    private final String defaultValue;
+
+    ConsulParameter() {
+        this(null);
+    }
+
+    ConsulParameter(String defaultValue) {
+        this(null, defaultValue);
+    }
+
+    ConsulParameter(String name, String defaultValue) {
+        this.name = isBlank(name) ? defaultName() : name;
+        this.defaultValue = defaultValue;
+    }
+
+    private String defaultName() {
+        return name().toLowerCase().replace('_', '-');
+    }
+
+    /**
+     * The parameter value from the specified registry {@link URL}
+     *
+     * @param registryURL the specified registry {@link URL}
+     * @return <code>defaultValue</code> if not found
+     */
+    public String getValue(URL registryURL) {
+        return registryURL.getParameter(name, defaultValue);
+    }
+
+    /**
+     * The parameter value from the specified registry {@link URL}
+     *
+     * @param registryURL  the specified registry {@link URL}
+     * @param valueType    the type of parameter value
+     * @param defaultValue the default value if parameter is absent
+     * @return <code>defaultValue</code> if not found
+     */
+    public <T> T getValue(URL registryURL, Class<T> valueType, T defaultValue) {
+        return registryURL.getParameter(name, valueType, defaultValue);
+    }
+}
diff --git a/dubbo-registry-extensions/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulRegistry.java b/dubbo-registry-extensions/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulRegistry.java
new file mode 100644
index 0000000..2338907
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulRegistry.java
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.registry.consul;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.URLBuilder;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.CollectionUtils;
+import org.apache.dubbo.common.utils.NamedThreadFactory;
+import org.apache.dubbo.common.utils.UrlUtils;
+import org.apache.dubbo.registry.NotifyListener;
+import org.apache.dubbo.registry.support.FailbackRegistry;
+import org.apache.dubbo.rpc.RpcException;
+
+import com.ecwid.consul.v1.ConsulClient;
+import com.ecwid.consul.v1.QueryParams;
+import com.ecwid.consul.v1.Response;
+import com.ecwid.consul.v1.agent.model.NewService;
+import com.ecwid.consul.v1.catalog.CatalogServicesRequest;
+import com.ecwid.consul.v1.health.HealthServicesRequest;
+import com.ecwid.consul.v1.health.model.HealthService;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static java.util.concurrent.Executors.newCachedThreadPool;
+import static org.apache.dubbo.common.constants.CommonConstants.ANY_VALUE;
+import static org.apache.dubbo.common.constants.RegistryConstants.CATEGORY_KEY;
+import static org.apache.dubbo.common.constants.RegistryConstants.EMPTY_PROTOCOL;
+import static org.apache.dubbo.registry.Constants.CONSUMER_PROTOCOL;
+import static org.apache.dubbo.registry.Constants.PROVIDER_PROTOCOL;
+import static org.apache.dubbo.registry.consul.AbstractConsulRegistry.CHECK_PASS_INTERVAL;
+import static org.apache.dubbo.registry.consul.AbstractConsulRegistry.DEFAULT_CHECK_PASS_INTERVAL;
+import static org.apache.dubbo.registry.consul.AbstractConsulRegistry.DEFAULT_DEREGISTER_TIME;
+import static org.apache.dubbo.registry.consul.AbstractConsulRegistry.DEREGISTER_AFTER;
+import static org.apache.dubbo.registry.consul.AbstractConsulRegistry.ONE_THOUSAND;
+import static org.apache.dubbo.registry.consul.AbstractConsulRegistry.PERIOD_DENOMINATOR;
+import static org.apache.dubbo.registry.consul.AbstractConsulRegistry.SERVICE_TAG;
+import static org.apache.dubbo.registry.consul.AbstractConsulRegistry.URL_META_KEY;
+import static org.apache.dubbo.rpc.Constants.TOKEN_KEY;
+
+/**
+ * registry center implementation for consul
+ */
+public class ConsulRegistry extends FailbackRegistry {
+    private static final Logger logger = LoggerFactory.getLogger(ConsulRegistry.class);
+
+    private ConsulClient client;
+    private long checkPassInterval;
+    private ExecutorService notifierExecutor = newCachedThreadPool(
+            new NamedThreadFactory("dubbo-consul-notifier", true));
+    private ConcurrentMap<URL, ConsulNotifier> notifiers = new ConcurrentHashMap<>();
+    private ScheduledExecutorService ttlConsulCheckExecutor;
+    /**
+     * The ACL token
+     */
+    private String token;
+
+    private static final int CONSUL_CORE_THREAD_SIZE = 1;
+
+    private static final int DEFAULT_INDEX = -1;
+    private static final int DEFAULT_WAIT_TIME = -1;
+
+
+    public ConsulRegistry(URL url) {
+        super(url);
+        token = url.getParameter(TOKEN_KEY, (String) null);
+        String host = url.getHost();
+        int port = ConsulConstants.INVALID_PORT != url.getPort() ? url.getPort() : ConsulConstants.DEFAULT_PORT;
+        client = new ConsulClient(host, port);
+        checkPassInterval = url.getParameter(CHECK_PASS_INTERVAL, DEFAULT_CHECK_PASS_INTERVAL);
+        ttlConsulCheckExecutor = new ScheduledThreadPoolExecutor(CONSUL_CORE_THREAD_SIZE, new NamedThreadFactory("Ttl-Consul-Check-Executor", true));
+        ttlConsulCheckExecutor.scheduleAtFixedRate(this::checkPass, checkPassInterval / PERIOD_DENOMINATOR,
+                checkPassInterval / PERIOD_DENOMINATOR, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void register(URL url) {
+        if (isConsumerSide(url)) {
+            return;
+        }
+
+        super.register(url);
+    }
+
+    @Override
+    public void doRegister(URL url) {
+        if (token == null) {
+            client.agentServiceRegister(buildService(url));
+        } else {
+            client.agentServiceRegister(buildService(url), token);
+        }
+    }
+
+    @Override
+    public void unregister(URL url) {
+        if (isConsumerSide(url)) {
+            return;
+        }
+
+        super.unregister(url);
+    }
+
+    @Override
+    public void doUnregister(URL url) {
+        if (token == null) {
+            client.agentServiceDeregister(buildId(url));
+        } else {
+            client.agentServiceDeregister(buildId(url), token);
+        }
+    }
+
+    @Override
+    public void subscribe(URL url, NotifyListener listener) {
+        if (isProviderSide(url)) {
+            return;
+        }
+
+        super.subscribe(url, listener);
+    }
+
+    @Override
+    public void doSubscribe(URL url, NotifyListener listener) {
+        Long index;
+        List<URL> urls;
+        if (ANY_VALUE.equals(url.getServiceInterface())) {
+            Response<Map<String, List<String>>> response = getAllServices(DEFAULT_INDEX, buildWatchTimeout(url));
+            index = response.getConsulIndex();
+            List<HealthService> services = getHealthServices(response.getValue());
+            urls = convert(services, url);
+        } else {
+            String service = url.getServiceInterface();
+            Response<List<HealthService>> response = getHealthServices(service, DEFAULT_INDEX, buildWatchTimeout(url));
+            index = response.getConsulIndex();
+            urls = convert(response.getValue(), url);
+        }
+
+        notify(url, listener, urls);
+        ConsulNotifier notifier = notifiers.computeIfAbsent(url, k -> new ConsulNotifier(url, index));
+        notifierExecutor.submit(notifier);
+    }
+
+    @Override
+    public void unsubscribe(URL url, NotifyListener listener) {
+        if (isProviderSide(url)) {
+            return;
+        }
+
+        super.unsubscribe(url, listener);
+    }
+
+    @Override
+    public void doUnsubscribe(URL url, NotifyListener listener) {
+        ConsulNotifier notifier = notifiers.remove(url);
+        notifier.stop();
+    }
+
+    @Override
+    public List<URL> lookup(URL url) {
+        if (url == null) {
+            throw new IllegalArgumentException("lookup url == null");
+        }
+        try {
+            String service = url.getServiceKey();
+            Response<List<HealthService>> result = getHealthServices(service, DEFAULT_INDEX, buildWatchTimeout(url));
+            if (result == null || result.getValue() == null || result.getValue().isEmpty()) {
+                return new ArrayList<>();
+            } else {
+                return convert(result.getValue(), url);
+            }
+        } catch (Throwable e) {
+            throw new RpcException("Failed to lookup " + url + " from consul " + getUrl() + ", cause: " + e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public boolean isAvailable() {
+        return client.getAgentSelf() != null;
+    }
+
+    @Override
+    public void destroy() {
+        super.destroy();
+        notifierExecutor.shutdown();
+        ttlConsulCheckExecutor.shutdown();
+    }
+
+    private void checkPass() {
+        for (URL url : getRegistered()) {
+            String checkId = buildId(url);
+            try {
+                if (token == null) {
+                    client.agentCheckPass("service:" + checkId);
+                } else {
+                    client.agentCheckPass("service:" + checkId, null, token);
+                }
+                if (logger.isDebugEnabled()) {
+                    logger.debug("check pass for url: " + url + " with check id: " + checkId);
+                }
+            } catch (Throwable t) {
+                logger.warn("fail to check pass for url: " + url + ", check id is: " + checkId, t);
+            }
+        }
+    }
+
+    private Response<List<HealthService>> getHealthServices(String service, long index, int watchTimeout) {
+        HealthServicesRequest request = HealthServicesRequest.newBuilder()
+                .setTag(SERVICE_TAG)
+                .setQueryParams(new QueryParams(watchTimeout, index))
+                .setPassing(true)
+                .setToken(token)
+                .build();
+        return client.getHealthServices(service, request);
+    }
+
+    private Response<Map<String, List<String>>> getAllServices(long index, int watchTimeout) {
+        CatalogServicesRequest request = CatalogServicesRequest.newBuilder()
+                .setQueryParams(new QueryParams(watchTimeout, index))
+                .setToken(token)
+                .build();
+        return client.getCatalogServices(request);
+    }
+
+    private List<HealthService> getHealthServices(Map<String, List<String>> services) {
+        return services.entrySet().stream()
+                .filter(s -> s.getValue().contains(SERVICE_TAG))
+                .map(s -> getHealthServices(s.getKey(), DEFAULT_INDEX, DEFAULT_WAIT_TIME).getValue())
+                .flatMap(Collection::stream)
+                .collect(Collectors.toList());
+    }
+
+
+    private boolean isConsumerSide(URL url) {
+        return url.getProtocol().equals(CONSUMER_PROTOCOL);
+    }
+
+    private boolean isProviderSide(URL url) {
+        return url.getProtocol().equals(PROVIDER_PROTOCOL);
+    }
+
+    private List<URL> convert(List<HealthService> services, URL consumerURL) {
+        if (CollectionUtils.isEmpty(services)) {
+            return emptyURL(consumerURL);
+        }
+        return services.stream()
+                .map(HealthService::getService)
+                .filter(Objects::nonNull)
+                .map(HealthService.Service::getMeta)
+                .filter(m -> m != null && m.containsKey(URL_META_KEY))
+                .map(m -> m.get(URL_META_KEY))
+                .map(URL::valueOf)
+                .filter(url -> UrlUtils.isMatch(consumerURL, url))
+                .collect(Collectors.toList());
+    }
+
+    private List<URL> emptyURL(URL consumerURL) {
+        // No Category Parameter
+        URL empty = URLBuilder.from(consumerURL)
+                .setProtocol(EMPTY_PROTOCOL)
+                .removeParameter(CATEGORY_KEY)
+                .build();
+        List<URL> result = new ArrayList<URL>();
+        result.add(empty);
+        return result;
+    }
+
+    private NewService buildService(URL url) {
+        NewService service = new NewService();
+        service.setAddress(url.getHost());
+        service.setPort(url.getPort());
+        service.setId(buildId(url));
+        service.setName(url.getServiceInterface());
+        service.setCheck(buildCheck(url));
+        service.setTags(buildTags(url));
+        service.setMeta(Collections.singletonMap(URL_META_KEY, url.toFullString()));
+        return service;
+    }
+
+    private List<String> buildTags(URL url) {
+        Map<String, String> params = url.getParameters();
+        List<String> tags = params.entrySet().stream()
+                .map(k -> k.getKey() + "=" + k.getValue())
+                .collect(Collectors.toList());
+        tags.add(SERVICE_TAG);
+        return tags;
+    }
+
+    private String buildId(URL url) {
+        // let's simply use url's hashcode to generate unique service id for now
+        return Integer.toHexString(url.hashCode());
+    }
+
+    private NewService.Check buildCheck(URL url) {
+        NewService.Check check = new NewService.Check();
+        check.setTtl((checkPassInterval / ONE_THOUSAND) + "s");
+        check.setDeregisterCriticalServiceAfter(url.getParameter(DEREGISTER_AFTER, DEFAULT_DEREGISTER_TIME));
+        return check;
+    }
+
+    private int buildWatchTimeout(URL url) {
+        return url.getParameter(ConsulConstants.WATCH_TIMEOUT, ConsulConstants.DEFAULT_WATCH_TIMEOUT) / ONE_THOUSAND;
+    }
+
+    private class ConsulNotifier implements Runnable {
+        private URL url;
+        private long consulIndex;
+        private boolean running;
+
+        ConsulNotifier(URL url, long consulIndex) {
+            this.url = url;
+            this.consulIndex = consulIndex;
+            this.running = true;
+        }
+
+        @Override
+        public void run() {
+            while (this.running) {
+                if (ANY_VALUE.equals(url.getServiceInterface())) {
+                    processServices();
+                } else {
+                    processService();
+                }
+            }
+        }
+
+        private void processService() {
+            String service = url.getServiceKey();
+            Response<List<HealthService>> response = getHealthServices(service, consulIndex, buildWatchTimeout(url));
+            Long currentIndex = response.getConsulIndex();
+            if (currentIndex != null && currentIndex > consulIndex) {
+                consulIndex = currentIndex;
+                List<HealthService> services = response.getValue();
+                List<URL> urls = convert(services, url);
+                for (NotifyListener listener : getSubscribed().get(url)) {
+                    doNotify(url, listener, urls);
+                }
+            }
+        }
+
+        private void processServices() {
+            Response<Map<String, List<String>>> response = getAllServices(consulIndex, buildWatchTimeout(url));
+            Long currentIndex = response.getConsulIndex();
+            if (currentIndex != null && currentIndex > consulIndex) {
+                consulIndex = currentIndex;
+                List<HealthService> services = getHealthServices(response.getValue());
+                List<URL> urls = convert(services, url);
+                for (NotifyListener listener : getSubscribed().get(url)) {
+                    doNotify(url, listener, urls);
+                }
+            }
+        }
+
+        void stop() {
+            this.running = false;
+        }
+    }
+}
diff --git a/dubbo-registry-extensions/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulRegistryFactory.java b/dubbo-registry-extensions/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulRegistryFactory.java
new file mode 100644
index 0000000..c36f009
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulRegistryFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.registry.consul;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.registry.Registry;
+import org.apache.dubbo.registry.support.AbstractRegistryFactory;
+
+/**
+ * registry center factory implementation for consul
+ */
+public class ConsulRegistryFactory extends AbstractRegistryFactory {
+    @Override
+    protected Registry createRegistry(URL url) {
+        return new ConsulRegistry(url);
+    }
+}
diff --git a/dubbo-registry-extensions/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulServiceDiscovery.java b/dubbo-registry-extensions/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulServiceDiscovery.java
new file mode 100644
index 0000000..2771a99
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulServiceDiscovery.java
@@ -0,0 +1,469 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.consul;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.CollectionUtils;
+import org.apache.dubbo.common.utils.NamedThreadFactory;
+import org.apache.dubbo.common.utils.StringUtils;
+import org.apache.dubbo.registry.client.AbstractServiceDiscovery;
+import org.apache.dubbo.registry.client.DefaultServiceInstance;
+import org.apache.dubbo.registry.client.ServiceInstance;
+import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent;
+import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener;
+import org.apache.dubbo.rpc.model.ApplicationModel;
+
+import com.ecwid.consul.v1.ConsistencyMode;
+import com.ecwid.consul.v1.ConsulClient;
+import com.ecwid.consul.v1.QueryParams;
+import com.ecwid.consul.v1.Response;
+import com.ecwid.consul.v1.agent.model.NewService;
+import com.ecwid.consul.v1.catalog.CatalogServicesRequest;
+import com.ecwid.consul.v1.health.HealthServicesRequest;
+import com.ecwid.consul.v1.health.model.HealthService;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static java.util.concurrent.Executors.newCachedThreadPool;
+import static org.apache.dubbo.common.constants.CommonConstants.COMMA_SEPARATOR_CHAR;
+import static org.apache.dubbo.common.constants.CommonConstants.SEMICOLON_SPLIT_PATTERN;
+import static org.apache.dubbo.registry.consul.AbstractConsulRegistry.CHECK_PASS_INTERVAL;
+import static org.apache.dubbo.registry.consul.AbstractConsulRegistry.DEFAULT_CHECK_PASS_INTERVAL;
+import static org.apache.dubbo.registry.consul.AbstractConsulRegistry.DEFAULT_DEREGISTER_TIME;
+import static org.apache.dubbo.registry.consul.AbstractConsulRegistry.DEREGISTER_AFTER;
+import static org.apache.dubbo.registry.consul.AbstractConsulRegistry.ONE_THOUSAND;
+import static org.apache.dubbo.registry.consul.AbstractConsulRegistry.PERIOD_DENOMINATOR;
+import static org.apache.dubbo.registry.consul.ConsulConstants.DEFAULT_WATCH_TIMEOUT;
+import static org.apache.dubbo.registry.consul.ConsulConstants.WATCH_TIMEOUT;
+import static org.apache.dubbo.registry.consul.ConsulParameter.ACL_TOKEN;
+import static org.apache.dubbo.registry.consul.ConsulParameter.CONSISTENCY_MODE;
+import static org.apache.dubbo.registry.consul.ConsulParameter.DEFAULT_ZONE_METADATA_NAME;
+import static org.apache.dubbo.registry.consul.ConsulParameter.INSTANCE_GROUP;
+import static org.apache.dubbo.registry.consul.ConsulParameter.INSTANCE_ZONE;
+import static org.apache.dubbo.registry.consul.ConsulParameter.TAGS;
+
+public class ConsulServiceDiscovery extends AbstractServiceDiscovery {
+
+    private static final String QUERY_TAG = "consul_query_tag";
+    private static final String REGISTER_TAG = "consul_register_tag";
+
+    private List<String> registeringTags = new ArrayList<>();
+    private String tag;
+    private ConsulClient client;
+    private ExecutorService notifierExecutor = newCachedThreadPool(
+        new NamedThreadFactory("dubbo-service-discovery-consul-notifier", true));
+    private Map<String, ConsulNotifier> notifiers = new ConcurrentHashMap<>();
+    private TtlScheduler ttlScheduler;
+    private long checkPassInterval;
+    private URL url;
+
+    private String aclToken;
+
+    private List<String> tags;
+
+    private ConsistencyMode consistencyMode;
+
+    private String defaultZoneMetadataName;
+
+    /**
+     * Service instance zone.
+     */
+    private String instanceZone;
+
+    /**
+     * Service instance group.
+     */
+    private String instanceGroup;
+
+    public ConsulServiceDiscovery(ApplicationModel applicationModel, URL registryURL) {
+        super(applicationModel, registryURL);
+        this.url = registryURL;
+        String host = url.getHost();
+        int port = ConsulConstants.INVALID_PORT != url.getPort() ? url.getPort() : ConsulConstants.DEFAULT_PORT;
+        checkPassInterval = url.getParameter(CHECK_PASS_INTERVAL, DEFAULT_CHECK_PASS_INTERVAL);
+        client = new ConsulClient(host, port);
+        ttlScheduler = new TtlScheduler(checkPassInterval, client);
+        this.tag = registryURL.getParameter(QUERY_TAG);
+        this.registeringTags.addAll(getRegisteringTags(url));
+        this.aclToken = ACL_TOKEN.getValue(registryURL);
+        this.tags = getTags(registryURL);
+        this.consistencyMode = getConsistencyMode(registryURL);
+        this.defaultZoneMetadataName = DEFAULT_ZONE_METADATA_NAME.getValue(registryURL);
+        this.instanceZone = INSTANCE_ZONE.getValue(registryURL);
+        this.instanceGroup = INSTANCE_GROUP.getValue(registryURL);
+    }
+
+    /**
+     * Get the {@link ConsistencyMode}
+     *
+     * @param registryURL the {@link URL} of registry
+     * @return non-null, {@link ConsistencyMode#DEFAULT} as default
+     * @sine 2.7.8
+     */
+    private ConsistencyMode getConsistencyMode(URL registryURL) {
+        String value = CONSISTENCY_MODE.getValue(registryURL);
+        if (StringUtils.isNotEmpty(value)) {
+            return ConsistencyMode.valueOf(value);
+        }
+        return ConsistencyMode.DEFAULT;
+    }
+
+    /**
+     * Get the "tags" from the {@link URL} of registry
+     *
+     * @param registryURL the {@link URL} of registry
+     * @return non-null
+     * @sine 2.7.8
+     */
+    private List<String> getTags(URL registryURL) {
+        String value = TAGS.getValue(registryURL);
+        return StringUtils.splitToList(value, COMMA_SEPARATOR_CHAR);
+    }
+
+    private List<String> getRegisteringTags(URL url) {
+        List<String> tags = new ArrayList<>();
+        String rawTag = url.getParameter(REGISTER_TAG);
+        if (StringUtils.isNotEmpty(rawTag)) {
+            tags.addAll(Arrays.asList(SEMICOLON_SPLIT_PATTERN.split(rawTag)));
+        }
+        return tags;
+    }
+
+    @Override
+    protected void doDestroy() throws Exception {
+        notifiers.forEach((_k, notifier) -> {
+            if (notifier != null) {
+                notifier.stop();
+            }
+        });
+        notifiers.clear();
+        notifierExecutor.shutdownNow();
+        ttlScheduler.stop();
+    }
+
+    @Override
+    public void doRegister(ServiceInstance serviceInstance) {
+        NewService consulService = buildService(serviceInstance);
+        ttlScheduler.add(consulService.getId());
+        client.agentServiceRegister(consulService, aclToken);
+    }
+
+    @Override
+    protected void doUnregister(ServiceInstance serviceInstance) {
+        String id = buildId(serviceInstance);
+        ttlScheduler.remove(id);
+        client.agentServiceDeregister(id, aclToken);
+    }
+
+    @Override
+    public synchronized void addServiceInstancesChangedListener(ServiceInstancesChangedListener listener)
+        throws NullPointerException, IllegalArgumentException {
+        Set<String> serviceNames = listener.getServiceNames();
+        for (String serviceName : serviceNames) {
+            ConsulNotifier notifier = notifiers.get(serviceName);
+            if (notifier == null) {
+                Response<List<HealthService>> response = getHealthServices(serviceName, -1, buildWatchTimeout());
+                Long consulIndex = response.getConsulIndex();
+                notifier = new ConsulNotifier(serviceName, consulIndex);
+                notifiers.put(serviceName, notifier);
+            }
+            notifier.addListener(listener);
+            notifierExecutor.execute(notifier);
+        }
+    }
+
+    @Override
+    public synchronized void removeServiceInstancesChangedListener(ServiceInstancesChangedListener listener) throws IllegalArgumentException {
+        Set<String> serviceNames = listener.getServiceNames();
+        for (String serviceName : serviceNames) {
+            ConsulNotifier notifier = notifiers.get(serviceName);
+            if (notifier != null) {
+                notifier.removeListener(listener);
+                if (notifier.getListenerCount() == 0) {
+                    notifier.stop();
+                    notifiers.remove(serviceName);
+                }
+            }
+        }
+    }
+
+    @Override
+    public Set<String> getServices() {
+        CatalogServicesRequest request = CatalogServicesRequest.newBuilder()
+            .setQueryParams(QueryParams.DEFAULT)
+            .setToken(aclToken)
+            .build();
+        return this.client.getCatalogServices(request).getValue().keySet();
+    }
+
+    @Override
+    public List<ServiceInstance> getInstances(String serviceName) throws NullPointerException {
+        Response<List<HealthService>> response = getHealthServices(serviceName, -1, buildWatchTimeout());
+        return convert(response.getValue());
+    }
+
+    private List<ServiceInstance> convert(List<HealthService> services) {
+        return services.stream()
+            .map(HealthService::getService)
+            .map(service -> {
+                ServiceInstance instance = new DefaultServiceInstance(
+                    service.getService(),
+                    service.getAddress(),
+                    service.getPort(),
+                    applicationModel);
+                instance.getMetadata().putAll(getMetadata(service));
+                return instance;
+            })
+            .collect(Collectors.toList());
+    }
+
+    private Response<List<HealthService>> getHealthServices(String service, long index, int watchTimeout) {
+        HealthServicesRequest request = HealthServicesRequest.newBuilder()
+            .setTag(tag)
+            .setQueryParams(new QueryParams(watchTimeout, index))
+            .setPassing(true)
+            .build();
+        return client.getHealthServices(service, request);
+    }
+
+    private Map<String, String> getMetadata(HealthService.Service service) {
+        Map<String, String> metadata = service.getMeta();
+        metadata = decodeMetadata(metadata);
+        if (CollectionUtils.isEmptyMap(metadata)) {
+            metadata = getScCompatibleMetadata(service.getTags());
+        }
+        return metadata;
+    }
+
+    private Map<String, String> getScCompatibleMetadata(List<String> tags) {
+        LinkedHashMap<String, String> metadata = new LinkedHashMap<>();
+        if (tags != null) {
+            for (String tag : tags) {
+                String[] parts = StringUtils.delimitedListToStringArray(tag, "=");
+                switch (parts.length) {
+                    case 0:
+                        break;
+                    case 1:
+                        metadata.put(parts[0], parts[0]);
+                        break;
+                    case 2:
+                        metadata.put(parts[0], parts[1]);
+                        break;
+                    default:
+                        String[] end = Arrays.copyOfRange(parts, 1, parts.length);
+                        metadata.put(parts[0], StringUtils.arrayToDelimitedString(end, "="));
+                        break;
+                }
+
+            }
+        }
+
+        return metadata;
+    }
+
+    private NewService buildService(ServiceInstance serviceInstance) {
+        NewService service = new NewService();
+        service.setAddress(serviceInstance.getHost());
+        service.setPort(serviceInstance.getPort());
+        service.setId(buildId(serviceInstance));
+        service.setName(serviceInstance.getServiceName());
+        service.setCheck(buildCheck(serviceInstance));
+        service.setTags(buildTags(serviceInstance));
+        return service;
+    }
+
+    private String buildId(ServiceInstance serviceInstance) {
+        return Integer.toHexString(serviceInstance.hashCode());
+    }
+
+    private List<String> buildTags(ServiceInstance serviceInstance) {
+        List<String> tags = new LinkedList<>(this.tags);
+
+        if (StringUtils.isNotEmpty(instanceZone)) {
+            tags.add(defaultZoneMetadataName + "=" + instanceZone);
+        }
+
+        if (StringUtils.isNotEmpty(instanceGroup)) {
+            tags.add("group=" + instanceGroup);
+        }
+
+        Map<String, String> params = serviceInstance.getMetadata();
+        params.keySet().stream()
+            .map(k -> k + "=" + params.get(k))
+            .forEach(tags::add);
+
+        tags.addAll(registeringTags);
+        return tags;
+    }
+
+    private Map<String, String> decodeMetadata(Map<String, String> metadata) {
+        if (metadata == null) {
+            return metadata;
+        }
+        Map<String, String> decoded = new HashMap<>(metadata.size());
+        metadata.forEach((k, v) -> decoded.put(new String(Base64.getDecoder().decode(k)), v));
+        return decoded;
+    }
+
+    private NewService.Check buildCheck(ServiceInstance serviceInstance) {
+        NewService.Check check = new NewService.Check();
+        check.setTtl((checkPassInterval / ONE_THOUSAND) + "s");
+        String deregister = serviceInstance.getMetadata().get(DEREGISTER_AFTER);
+        check.setDeregisterCriticalServiceAfter(deregister == null ? DEFAULT_DEREGISTER_TIME : deregister);
+        return check;
+    }
+
+    private int buildWatchTimeout() {
+        return url.getParameter(WATCH_TIMEOUT, DEFAULT_WATCH_TIMEOUT) / ONE_THOUSAND;
+    }
+
+    private class ConsulNotifier implements Runnable {
+        private final String serviceName;
+        private long consulIndex;
+        private boolean running;
+
+        private final List<ServiceInstancesChangedListener> listener;
+
+        ConsulNotifier(String serviceName, long consulIndex) {
+            this.serviceName = serviceName;
+            this.consulIndex = consulIndex;
+            this.running = true;
+            this.listener = new CopyOnWriteArrayList<>();
+        }
+
+        @Override
+        public void run() {
+            while (this.running) {
+                processService();
+            }
+        }
+
+        private void processService() {
+            Response<List<HealthService>> response = getHealthServices(serviceName, consulIndex, Integer.MAX_VALUE);
+            Long currentIndex = response.getConsulIndex();
+            if (currentIndex != null && currentIndex > consulIndex) {
+                consulIndex = currentIndex;
+                List<HealthService> services = response.getValue();
+                List<ServiceInstance> serviceInstances = convert(services);
+                listener.forEach(l -> l.onEvent(new ServiceInstancesChangedEvent(serviceName, serviceInstances)));
+            }
+        }
+
+        public void addListener(ServiceInstancesChangedListener listener) {
+            this.listener.add(listener);
+        }
+
+        public void removeListener(ServiceInstancesChangedListener listener) {
+            this.listener.remove(listener);
+        }
+
+        public int getListenerCount() {
+            return this.listener.size();
+        }
+
+        void stop() {
+            this.running = false;
+        }
+    }
+
+    private static class TtlScheduler {
+
+        private static final Logger logger = LoggerFactory.getLogger(TtlScheduler.class);
+
+        private final Map<String, ScheduledFuture> serviceHeartbeats = new ConcurrentHashMap<>();
+
+        private ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
+
+        private long checkInterval;
+
+        private ConsulClient client;
+
+        public TtlScheduler(long checkInterval, ConsulClient client) {
+            this.checkInterval = checkInterval;
+            this.client = client;
+        }
+
+        /**
+         * Add a service to the checks loop.
+         *
+         * @param instanceId instance id
+         */
+        public void add(String instanceId) {
+            ScheduledFuture task = this.scheduler.scheduleAtFixedRate(
+                new ConsulHeartbeatTask(instanceId),
+                checkInterval / PERIOD_DENOMINATOR,
+                checkInterval / PERIOD_DENOMINATOR,
+                TimeUnit.MILLISECONDS);
+            ScheduledFuture previousTask = this.serviceHeartbeats.put(instanceId, task);
+            if (previousTask != null) {
+                previousTask.cancel(true);
+            }
+        }
+
+        public void remove(String instanceId) {
+            ScheduledFuture task = this.serviceHeartbeats.get(instanceId);
+            if (task != null) {
+                task.cancel(true);
+            }
+            this.serviceHeartbeats.remove(instanceId);
+        }
+
+        private class ConsulHeartbeatTask implements Runnable {
+
+            private String checkId;
+
+            ConsulHeartbeatTask(String serviceId) {
+                this.checkId = serviceId;
+                if (!this.checkId.startsWith("service:")) {
+                    this.checkId = "service:" + this.checkId;
+                }
+            }
+
+            @Override
+            public void run() {
+                TtlScheduler.this.client.agentCheckPass(this.checkId);
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Sending consul heartbeat for: " + this.checkId);
+                }
+            }
+
+        }
+
+        public void stop() {
+            scheduler.shutdownNow();
+        }
+
+    }
+}
diff --git a/dubbo-registry-extensions/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulServiceDiscoveryFactory.java b/dubbo-registry-extensions/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulServiceDiscoveryFactory.java
new file mode 100644
index 0000000..dcb0b5d
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulServiceDiscoveryFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.consul;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.registry.client.AbstractServiceDiscoveryFactory;
+import org.apache.dubbo.registry.client.ServiceDiscovery;
+
+public class ConsulServiceDiscoveryFactory extends AbstractServiceDiscoveryFactory {
+
+    @Override
+    protected ServiceDiscovery createDiscovery(URL registryURL) {
+        return new ConsulServiceDiscovery(applicationModel, registryURL);
+    }
+
+}
diff --git a/dubbo-registry-extensions/dubbo-registry-consul/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.RegistryFactory b/dubbo-registry-extensions/dubbo-registry-consul/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.RegistryFactory
new file mode 100644
index 0000000..7aea18f
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-consul/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.RegistryFactory
@@ -0,0 +1 @@
+consul=org.apache.dubbo.registry.consul.ConsulRegistryFactory
diff --git a/dubbo-registry-extensions/dubbo-registry-consul/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.ServiceDiscoveryFactory b/dubbo-registry-extensions/dubbo-registry-consul/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.ServiceDiscoveryFactory
new file mode 100644
index 0000000..a0f1252
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-consul/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.ServiceDiscoveryFactory
@@ -0,0 +1 @@
+consul=org.apache.dubbo.registry.consul.ConsulServiceDiscoveryFactory
\ No newline at end of file
diff --git a/dubbo-registry-extensions/dubbo-registry-consul/src/test/java/org/apache/dubbo/registry/consul/ConsulRegistryTest.java b/dubbo-registry-extensions/dubbo-registry-consul/src/test/java/org/apache/dubbo/registry/consul/ConsulRegistryTest.java
new file mode 100644
index 0000000..fcd0c99
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-consul/src/test/java/org/apache/dubbo/registry/consul/ConsulRegistryTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.consul;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.status.Status;
+import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.registry.NotifyListener;
+import org.apache.dubbo.registry.Registry;
+import org.apache.dubbo.registry.status.RegistryStatusChecker;
+import org.apache.dubbo.rpc.model.ApplicationModel;
+
+import com.pszymczyk.consul.ConsulProcess;
+import com.pszymczyk.consul.ConsulStarterBuilder;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Mockito.mock;
+
+public class ConsulRegistryTest {
+
+    private ConsulProcess consul;
+    private ConsulRegistry consulRegistry;
+    private String service = "org.apache.dubbo.test.injvmServie";
+    private URL serviceUrl = URL.valueOf("consul://127.0.0.1:" + NetUtils.getAvailablePort() + "/" + service + "?notify=false&methods=test1,test2");
+    private URL registryUrl;
+    private ConsulRegistryFactory consulRegistryFactory;
+
+    @BeforeEach
+    public void setUp() {
+        Exception exception = null;
+        for (int i = 0; i < 10; i++) {
+            try {
+                this.consul = ConsulStarterBuilder.consulStarter()
+                        .build()
+                        .start();
+                exception = null;
+            } catch (Exception e) {
+                exception = e;
+                e.printStackTrace();
+            }
+            if (exception == null) {
+                break;
+            }
+        }
+        Assertions.assertNull(exception);
+        this.registryUrl = URL.valueOf("consul://localhost:" + consul.getHttpPort());
+
+        consulRegistryFactory = new ConsulRegistryFactory();
+        this.consulRegistry = (ConsulRegistry) consulRegistryFactory.createRegistry(registryUrl);
+        consulRegistryFactory.setApplicationModel(ApplicationModel.defaultModel());
+    }
+
+    @AfterEach
+    public void tearDown() throws Exception {
+        consul.close();
+        this.consulRegistry.destroy();
+    }
+
+    @Test
+    public void testRegister() {
+        Set<URL> registered;
+
+        for (int i = 0; i < 2; i++) {
+            consulRegistry.register(serviceUrl);
+            registered = consulRegistry.getRegistered();
+            assertThat(registered.contains(serviceUrl), is(true));
+        }
+
+        registered = consulRegistry.getRegistered();
+
+        assertThat(registered.size(), is(1));
+    }
+
+    @Test
+    public void testSubscribe() {
+        NotifyListener listener = mock(NotifyListener.class);
+        consulRegistry.subscribe(serviceUrl, listener);
+
+        Map<URL, Set<NotifyListener>> subscribed = consulRegistry.getSubscribed();
+        assertThat(subscribed.size(), is(1));
+        assertThat(subscribed.get(serviceUrl).size(), is(1));
+
+        consulRegistry.unsubscribe(serviceUrl, listener);
+        subscribed = consulRegistry.getSubscribed();
+        assertThat(subscribed.size(), is(1));
+        assertThat(subscribed.get(serviceUrl).size(), is(0));
+    }
+
+    @Test
+    public void testAvailable() {
+        consulRegistry.register(serviceUrl);
+        assertThat(consulRegistry.isAvailable(), is(true));
+
+//        consulRegistry.destroy();
+//        assertThat(consulRegistry.isAvailable(), is(false));
+    }
+
+    @Test
+    public void testLookup() throws InterruptedException {
+        List<URL> lookup = consulRegistry.lookup(serviceUrl);
+        assertThat(lookup.size(), is(0));
+
+        consulRegistry.register(serviceUrl);
+        Thread.sleep(5000);
+        lookup = consulRegistry.lookup(serviceUrl);
+        assertThat(lookup.size(), is(1));
+    }
+
+    @Test
+    public void testStatusChecker() {
+        RegistryStatusChecker registryStatusChecker = new RegistryStatusChecker(ApplicationModel.defaultModel());
+        Status status = registryStatusChecker.check();
+        assertThat(status.getLevel(), is(Status.Level.UNKNOWN));
+
+        Registry registry = consulRegistryFactory.getRegistry(registryUrl);
+        assertThat(registry, not(nullValue()));
+
+        status = registryStatusChecker.check();
+        assertThat(status.getLevel(), is(Status.Level.OK));
+
+        registry.register(serviceUrl);
+        status = registryStatusChecker.check();
+        assertThat(status.getLevel(), is(Status.Level.OK));
+    }
+
+}
diff --git a/dubbo-registry-extensions/dubbo-registry-consul/src/test/java/org/apache/dubbo/registry/consul/ConsulServiceDiscoveryTest.java b/dubbo-registry-extensions/dubbo-registry-consul/src/test/java/org/apache/dubbo/registry/consul/ConsulServiceDiscoveryTest.java
new file mode 100644
index 0000000..f6e468f
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-consul/src/test/java/org/apache/dubbo/registry/consul/ConsulServiceDiscoveryTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.consul;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.config.ApplicationConfig;
+import org.apache.dubbo.registry.client.DefaultServiceInstance;
+import org.apache.dubbo.registry.client.ServiceInstance;
+import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent;
+import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener;
+import org.apache.dubbo.rpc.model.ApplicationModel;
+import org.apache.dubbo.rpc.model.FrameworkModel;
+
+import com.pszymczyk.consul.ConsulProcess;
+import com.pszymczyk.consul.ConsulStarterBuilder;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class ConsulServiceDiscoveryTest {
+
+    private URL url;
+    private ConsulServiceDiscovery consulServiceDiscovery;
+    private ConsulProcess consul;
+    private static final String SERVICE_NAME = "A";
+    private static final String LOCALHOST = "127.0.0.1";
+
+    private ApplicationModel applicationModel;
+
+    @BeforeEach
+    public void init() throws Exception {
+        this.applicationModel = FrameworkModel.defaultModel().newApplication();
+        applicationModel.getApplicationConfigManager().setApplication(new ApplicationConfig("A"));
+        this.consul = ConsulStarterBuilder.consulStarter()
+                .build()
+                .start();
+        url = URL.valueOf("consul://localhost:" + consul.getHttpPort()).addParameter("consul-check-pass-interval", "1000");
+        consulServiceDiscovery = new ConsulServiceDiscovery(applicationModel, url);
+    }
+
+    @AfterEach
+    public void close() throws Exception {
+        consulServiceDiscovery.destroy();
+        consul.close();
+        this.applicationModel.destroy();
+    }
+
+    @Test
+    public void testRegister() throws InterruptedException {
+        ServiceInstance serviceInstance = new DefaultServiceInstance(SERVICE_NAME, LOCALHOST, 8080, applicationModel);
+        serviceInstance.getMetadata().put("test", "test");
+        serviceInstance.getMetadata().put("test123", "test");
+        consulServiceDiscovery.doRegister(serviceInstance);
+        Thread.sleep(3000);
+        List<ServiceInstance> serviceInstances = consulServiceDiscovery.getInstances(SERVICE_NAME);
+        assertEquals(1, serviceInstances.size());
+        assertEquals(serviceInstance, serviceInstances.get(0));
+
+        ServiceInstance serviceInstance2 = new DefaultServiceInstance(SERVICE_NAME, LOCALHOST, 8081, applicationModel);
+        serviceInstance2.getMetadata().put("test", "test");
+        serviceInstance2.getMetadata().put("test123", "test");
+        consulServiceDiscovery.doRegister(serviceInstance2);
+
+        Thread.sleep(3000);
+        serviceInstances = consulServiceDiscovery.getInstances(SERVICE_NAME);
+        assertEquals(2, serviceInstances.size());
+        assertEquals(Arrays.asList(serviceInstance, serviceInstance2), serviceInstances);
+
+        consulServiceDiscovery.doUnregister(serviceInstance);
+        Thread.sleep(3000);
+        serviceInstances = consulServiceDiscovery.getInstances(SERVICE_NAME);
+        assertEquals(1, serviceInstances.size());
+        assertEquals(serviceInstance2, serviceInstances.get(0));
+
+        consulServiceDiscovery.doUnregister(serviceInstance2);
+        Thread.sleep(3000);
+        serviceInstances = consulServiceDiscovery.getInstances(SERVICE_NAME);
+        assertEquals(0, serviceInstances.size());
+    }
+
+    @Test
+    public void testNotify() throws InterruptedException {
+        ServiceInstancesChangedListener serviceInstancesChangedListener1 = Mockito.spy(new ServiceInstancesChangedListener(Collections.singleton(SERVICE_NAME), consulServiceDiscovery));
+        ServiceInstancesChangedListener serviceInstancesChangedListener2 = Mockito.spy(new ServiceInstancesChangedListener(Collections.singleton(SERVICE_NAME), consulServiceDiscovery));
+        consulServiceDiscovery.addServiceInstancesChangedListener(serviceInstancesChangedListener1);
+        consulServiceDiscovery.addServiceInstancesChangedListener(serviceInstancesChangedListener2);
+        consulServiceDiscovery.removeServiceInstancesChangedListener(serviceInstancesChangedListener2);
+        ArgumentCaptor<ServiceInstancesChangedEvent> eventArgumentCaptor = ArgumentCaptor.forClass(ServiceInstancesChangedEvent.class);
+
+        ServiceInstance serviceInstance = new DefaultServiceInstance(SERVICE_NAME, LOCALHOST, 8080, applicationModel);
+        serviceInstance.getMetadata().put("test", "test");
+        serviceInstance.getMetadata().put("test123", "test");
+        consulServiceDiscovery.doRegister(serviceInstance);
+        Thread.sleep(3000);
+
+        Mockito.verify(serviceInstancesChangedListener1, Mockito.atLeast(1)).onEvent(eventArgumentCaptor.capture());
+        List<ServiceInstance> serviceInstances = eventArgumentCaptor.getValue().getServiceInstances();
+        assertEquals(1, serviceInstances.size());
+        assertEquals(serviceInstance, serviceInstances.get(0));
+
+        ServiceInstance serviceInstance2 = new DefaultServiceInstance(SERVICE_NAME, LOCALHOST, 8081, applicationModel);
+        serviceInstance2.getMetadata().put("test", "test");
+        serviceInstance2.getMetadata().put("test123", "test");
+        consulServiceDiscovery.doRegister(serviceInstance2);
+
+        Thread.sleep(3000);
+        Mockito.verify(serviceInstancesChangedListener1, Mockito.atLeast(2)).onEvent(eventArgumentCaptor.capture());
+        serviceInstances = eventArgumentCaptor.getValue().getServiceInstances();
+        assertEquals(2, serviceInstances.size());
+        assertEquals(Arrays.asList(serviceInstance, serviceInstance2), serviceInstances);
+
+        consulServiceDiscovery.doUnregister(serviceInstance);
+        Thread.sleep(3000);
+        Mockito.verify(serviceInstancesChangedListener1, Mockito.atLeast(3)).onEvent(eventArgumentCaptor.capture());
+        serviceInstances = eventArgumentCaptor.getValue().getServiceInstances();
+        assertEquals(1, serviceInstances.size());
+        assertEquals(serviceInstance2, serviceInstances.get(0));
+
+        consulServiceDiscovery.doUnregister(serviceInstance2);
+        Thread.sleep(3000);
+        Mockito.verify(serviceInstancesChangedListener1, Mockito.atLeast(4)).onEvent(eventArgumentCaptor.capture());
+        serviceInstances = eventArgumentCaptor.getValue().getServiceInstances();
+        assertEquals(0, serviceInstances.size());
+
+        Mockito.verify(serviceInstancesChangedListener2, Mockito.never()).onEvent(Mockito.any());
+    }
+}
diff --git a/dubbo-registry-extensions/dubbo-registry-etcd3/pom.xml b/dubbo-registry-extensions/dubbo-registry-etcd3/pom.xml
new file mode 100644
index 0000000..edf71af
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-etcd3/pom.xml
@@ -0,0 +1,51 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>dubbo-registry-extensions</artifactId>
+        <groupId>org.apache.dubbo.extensions</groupId>
+        <version>${revision}</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <version>1.0.0-SNAPSHOT</version>
+    <artifactId>dubbo-registry-etcd3</artifactId>
+    <packaging>jar</packaging>
+    <name>${project.artifactId}</name>
+    <description>The etcd3 registry module of Dubbo project</description>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.dubbo</groupId>
+            <artifactId>dubbo-registry-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.dubbo</groupId>
+            <artifactId>dubbo-common</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.dubbo.extensions</groupId>
+            <artifactId>dubbo-remoting-etcd3</artifactId>
+            <version>1.0.0-SNAPSHOT</version>
+        </dependency>
+    </dependencies>
+
+
+</project>
diff --git a/dubbo-registry-extensions/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdRegistry.java b/dubbo-registry-extensions/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdRegistry.java
new file mode 100644
index 0000000..ef7ee63
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdRegistry.java
@@ -0,0 +1,365 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.etcd;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.utils.CollectionUtils;
+import org.apache.dubbo.common.utils.ConcurrentHashSet;
+import org.apache.dubbo.common.utils.UrlUtils;
+import org.apache.dubbo.registry.NotifyListener;
+import org.apache.dubbo.registry.support.FailbackRegistry;
+import org.apache.dubbo.remoting.etcd.ChildListener;
+import org.apache.dubbo.remoting.etcd.Constants;
+import org.apache.dubbo.remoting.etcd.EtcdClient;
+import org.apache.dubbo.remoting.etcd.EtcdTransporter;
+import org.apache.dubbo.remoting.etcd.StateListener;
+import org.apache.dubbo.remoting.etcd.option.OptionUtil;
+import org.apache.dubbo.rpc.RpcException;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import static org.apache.dubbo.common.constants.CommonConstants.ANY_VALUE;
+import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.PATH_SEPARATOR;
+import static org.apache.dubbo.common.constants.RegistryConstants.CATEGORY_KEY;
+import static org.apache.dubbo.common.constants.RegistryConstants.CONFIGURATORS_CATEGORY;
+import static org.apache.dubbo.common.constants.RegistryConstants.CONSUMERS_CATEGORY;
+import static org.apache.dubbo.common.constants.RegistryConstants.DEFAULT_CATEGORY;
+import static org.apache.dubbo.common.constants.RegistryConstants.DYNAMIC_KEY;
+import static org.apache.dubbo.common.constants.RegistryConstants.EMPTY_PROTOCOL;
+import static org.apache.dubbo.common.constants.RegistryConstants.PROVIDERS_CATEGORY;
+import static org.apache.dubbo.common.constants.RegistryConstants.ROUTERS_CATEGORY;
+import static org.apache.dubbo.remoting.Constants.CHECK_KEY;
+
+
+/**
+ * Support for ectd3 registry.
+ */
+public class EtcdRegistry extends FailbackRegistry {
+
+    private static final int DEFAULT_ETCD_PORT = 2379;
+
+    private static final String DEFAULT_ROOT = "dubbo";
+
+    private final String root;
+
+    private final Set<String> anyServices = new ConcurrentHashSet<>();
+
+    private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> etcdListeners = new ConcurrentHashMap<>();
+    private final EtcdClient etcdClient;
+
+    public EtcdRegistry(URL url, EtcdTransporter etcdTransporter) {
+        super(url);
+        if (url.isAnyHost()) {
+            throw new IllegalStateException("registry address is invalid, actual: '" + url.getHost() + "'");
+        }
+        String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT);
+        if (!group.startsWith(PATH_SEPARATOR)) {
+            group = PATH_SEPARATOR + group;
+        }
+        this.root = group;
+        etcdClient = etcdTransporter.connect(url);
+
+        etcdClient.addStateListener(state -> {
+            if (state == StateListener.CONNECTED) {
+                try {
+                    recover();
+                } catch (Exception e) {
+                    logger.error(e.getMessage(), e);
+                }
+            }
+        });
+    }
+
+    protected static String appendDefaultPort(String address) {
+        if (address != null && address.length() > 0) {
+            int i = address.indexOf(':');
+            if (i < 0) {
+                return address + ":" + DEFAULT_ETCD_PORT;
+            } else if (Integer.parseInt(address.substring(i + 1)) == 0) {
+                return address.substring(0, i + 1) + DEFAULT_ETCD_PORT;
+            }
+        }
+        return address;
+    }
+
+    @Override
+    public void doRegister(URL url) {
+        try {
+            String path = toUrlPath(url);
+            if (url.getParameter(DYNAMIC_KEY, true)) {
+                etcdClient.createEphemeral(path);
+                return;
+            }
+            etcdClient.create(path);
+        } catch (Throwable e) {
+            throw new RpcException("Failed to register " + url + " to etcd " + getUrl()
+                    + ", cause: " + (OptionUtil.isProtocolError(e)
+                    ? "etcd3 registry may not be supported yet or etcd3 registry is not available."
+                    : e.getMessage()), e);
+        }
+    }
+
+    @Override
+    public void doUnregister(URL url) {
+        try {
+            String path = toUrlPath(url);
+            etcdClient.delete(path);
+        } catch (Throwable e) {
+            throw new RpcException("Failed to unregister " + url + " to etcd " + getUrl() + ", cause: " + e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public void doSubscribe(URL url, NotifyListener listener) {
+        try {
+            if (ANY_VALUE.equals(url.getServiceInterface())) {
+                String root = toRootPath();
+
+                /*
+                 *  if we are interested in all interfaces,
+                 *  find out the current container or create one for the url, put or get only once.
+                 */
+                ConcurrentMap<NotifyListener, ChildListener> listeners =
+                        Optional.ofNullable(etcdListeners.get(url))
+                                .orElseGet(() -> {
+                                    ConcurrentMap<NotifyListener, ChildListener> container, prev;
+                                    prev = etcdListeners.putIfAbsent(url, container = new ConcurrentHashMap<>());
+                                    return prev != null ? prev : container;
+                                });
+
+                /*
+                 *  if we have no interface watcher listener,
+                 *  find the current listener or create one for the current root, put or get only once.
+                 */
+                ChildListener interfaceListener =
+                        Optional.ofNullable(listeners.get(listener))
+                                .orElseGet(() -> {
+                                    ChildListener childListener, prev;
+                                    prev = listeners.putIfAbsent(listener, childListener = (parentPath, currentChildren) -> {
+                                        /*
+                                         *  because etcd3 does not support direct children watch events,
+                                         *  we should filter not interface events. if we watch /dubbo
+                                         *  and /dubbo/interface, when we put a key-value pair {/dubbo/interface/hello hello},
+                                         *  we will got events in watching path /dubbo.
+                                         */
+                                        for (String child : currentChildren) {
+                                            child = URL.decode(child);
+                                            if (!anyServices.contains(child)) {
+                                                anyServices.add(child);
+                                                /*
+                                                 *  if new interface event arrived, we watch their direct children,
+                                                 *  eg: /dubbo/interface, /dubbo/interface and so on.
+                                                 */
+                                                subscribe(url.setPath(child).addParameters(INTERFACE_KEY, child,
+                                                        CHECK_KEY, String.valueOf(false)), listener);
+                                            }
+                                        }
+                                    });
+                                    return prev != null ? prev : childListener;
+                                });
+
+                etcdClient.create(root);
+                /*
+                 * at the first time, we want to pull already interface and then watch their direct children,
+                 *  eg: /dubbo/interface, /dubbo/interface and so on.
+                 */
+                List<String> services = etcdClient.addChildListener(root, interfaceListener);
+                for (String service : services) {
+                    service = URL.decode(service);
+                    anyServices.add(service);
+                    subscribe(url.setPath(service).addParameters(INTERFACE_KEY, service,
+                            CHECK_KEY, String.valueOf(false)), listener);
+                }
+            } else {
+                List<URL> urls = new ArrayList<>();
+                for (String path : toCategoriesPath(url)) {
+
+                    /*
+                     *  if we are interested in special categories (providers, consumers, routers and so on),
+                     *  we find out the current container or create one for the url, put or get only once.
+                     */
+                    ConcurrentMap<NotifyListener, ChildListener> listeners =
+                            Optional.ofNullable(etcdListeners.get(url))
+                                    .orElseGet(() -> {
+                                        ConcurrentMap<NotifyListener, ChildListener> container, prev;
+                                        prev = etcdListeners.putIfAbsent(url,
+                                                container = new ConcurrentHashMap<>());
+                                        return prev != null ? prev : container;
+                                    });
+
+                    /*
+                     *  if we have no category watcher listener,
+                     *  we find out the current listener or create one for the current category, put or get only once.
+                     */
+                    ChildListener childListener =
+                            Optional.ofNullable(listeners.get(listener))
+                                    .orElseGet(() -> {
+                                        ChildListener watchListener, prev;
+                                        prev = listeners.putIfAbsent(listener, watchListener = (parentPath, currentChildren) -> EtcdRegistry.this.notify(url, listener,
+                                                toUrlsWithEmpty(url, parentPath, currentChildren)));
+                                        return prev != null ? prev : watchListener;
+                                    });
+
+                    etcdClient.create(path);
+                    /*
+                     * at the first time, we want to pull already category and then watch their direct children,
+                     *  eg: /dubbo/interface/providers, /dubbo/interface/consumers and so on.
+                     */
+                    List<String> children = etcdClient.addChildListener(path, childListener);
+                    if (children != null) {
+                        urls.addAll(toUrlsWithEmpty(url, path, children));
+                    }
+                }
+                notify(url, listener, urls);
+            }
+        } catch (Throwable e) {
+            throw new RpcException("Failed to subscribe " + url + " to etcd " + getUrl()
+                    + ", cause: " + (OptionUtil.isProtocolError(e)
+                    ? "etcd3 registry may not be supported yet or etcd3 registry is not available."
+                    : e.getMessage()), e);
+        }
+    }
+
+    @Override
+    public void doUnsubscribe(URL url, NotifyListener listener) {
+        ConcurrentMap<NotifyListener, ChildListener> listeners = etcdListeners.get(url);
+        if (listeners != null) {
+            ChildListener etcdListener = listeners.remove(listener);
+            if (etcdListener != null) {
+                if (ANY_VALUE.equals(url.getServiceInterface())) {
+                    String root = toRootPath();
+                    etcdClient.removeChildListener(root, etcdListener);
+                }else {
+                    // maybe url has many subscribed paths
+                    for (String path : toUnsubscribedPath(url)) {
+                        etcdClient.removeChildListener(path, etcdListener);
+                    }
+                }
+            }
+
+            if(listeners.isEmpty()){
+                etcdListeners.remove(url);
+            }
+        }
+    }
+
+    @Override
+    public boolean isAvailable() {
+        return etcdClient.isConnected();
+    }
+
+    @Override
+    public void destroy() {
+        super.destroy();
+        try {
+            etcdClient.close();
+        } catch (Exception e) {
+            logger.warn("Failed to close etcd client " + getUrl() + ", cause: " + e.getMessage(), e);
+        }
+    }
+
+    protected String toRootDir() {
+        if (root.startsWith(PATH_SEPARATOR)) {
+            return root;
+        }
+        return PATH_SEPARATOR + root;
+    }
+
+    protected String toRootPath() {
+        return root;
+    }
+
+    protected String toServicePath(URL url) {
+        String name = url.getServiceInterface();
+        if (ANY_VALUE.equals(name)) {
+            return toRootPath();
+        }
+        return toRootDir() + PATH_SEPARATOR + URL.encode(name);
+    }
+
+    protected String[] toCategoriesPath(URL url) {
+        String[] categories;
+        if (ANY_VALUE.equals(url.getParameter(CATEGORY_KEY))) {
+            categories = new String[]{PROVIDERS_CATEGORY, CONSUMERS_CATEGORY, ROUTERS_CATEGORY, CONFIGURATORS_CATEGORY};
+        } else {
+            categories = url.getParameter(CATEGORY_KEY, new String[]{DEFAULT_CATEGORY});
+        }
+        String[] paths = new String[categories.length];
+        for (int i = 0; i < categories.length; i++) {
+            paths[i] = toServicePath(url) + PATH_SEPARATOR + categories[i];
+        }
+        return paths;
+    }
+
+    protected String toCategoryPath(URL url) {
+        return toServicePath(url) + PATH_SEPARATOR + url.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY);
+    }
+
+    protected String toUrlPath(URL url) {
+        return toCategoryPath(url) + PATH_SEPARATOR + URL.encode(url.toFullString());
+    }
+
+    protected List<String> toUnsubscribedPath(URL url) {
+        List<String> categories = new ArrayList<>();
+        if (ANY_VALUE.equals(url.getServiceInterface())) {
+            String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT);
+            if (!group.startsWith(PATH_SEPARATOR)) {
+                group = PATH_SEPARATOR + group;
+            }
+            categories.add(group);
+            return categories;
+        } else {
+            categories.addAll(Arrays.asList(toCategoriesPath(url)));
+        }
+        return categories;
+    }
+
+    protected List<URL> toUrlsWithoutEmpty(URL consumer, List<String> providers) {
+        List<URL> urls = new ArrayList<>();
+        if (providers != null && providers.size() > 0) {
+            for (String provider : providers) {
+                provider = URL.decode(provider);
+                if (provider.contains(Constants.HTTP_SUBFIX_KEY)) {
+                    URL url = URL.valueOf(provider);
+                    if (UrlUtils.isMatch(consumer, url)) {
+                        urls.add(url);
+                    }
+                }
+            }
+        }
+        return urls;
+    }
+
+    protected List<URL> toUrlsWithEmpty(URL consumer, String path, List<String> providers) {
+        List<URL> urls = toUrlsWithoutEmpty(consumer, providers);
+        if (CollectionUtils.isEmpty(urls)) {
+            int i = path.lastIndexOf('/');
+            String category = i < 0 ? path : path.substring(i + 1);
+            URL empty = consumer.setProtocol(EMPTY_PROTOCOL).addParameter(CATEGORY_KEY, category);
+            urls.add(empty);
+        }
+        return urls;
+    }
+}
diff --git a/dubbo-registry-extensions/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdRegistryFactory.java b/dubbo-registry-extensions/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdRegistryFactory.java
new file mode 100644
index 0000000..a7d00b3
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdRegistryFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.etcd;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.registry.Registry;
+import org.apache.dubbo.registry.support.AbstractRegistryFactory;
+import org.apache.dubbo.remoting.etcd.EtcdTransporter;
+
+public class EtcdRegistryFactory extends AbstractRegistryFactory {
+
+    private EtcdTransporter etcdTransporter;
+
+    @Override
+    protected Registry createRegistry(URL url) {
+        return new EtcdRegistry(url, etcdTransporter);
+    }
+
+    public void setEtcdTransporter(EtcdTransporter etcdTransporter) {
+        this.etcdTransporter = etcdTransporter;
+    }
+}
diff --git a/dubbo-registry-extensions/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdServiceDiscovery.java b/dubbo-registry-extensions/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdServiceDiscovery.java
new file mode 100644
index 0000000..a7bffa4
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdServiceDiscovery.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.etcd;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.CollectionUtils;
+import org.apache.dubbo.common.utils.ConcurrentHashSet;
+import org.apache.dubbo.registry.client.AbstractServiceDiscovery;
+import org.apache.dubbo.registry.client.DefaultServiceInstance;
+import org.apache.dubbo.registry.client.ServiceInstance;
+import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent;
+import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener;
+import org.apache.dubbo.remoting.etcd.ChildListener;
+import org.apache.dubbo.remoting.etcd.EtcdClient;
+import org.apache.dubbo.remoting.etcd.EtcdTransporter;
+import org.apache.dubbo.remoting.etcd.StateListener;
+import org.apache.dubbo.remoting.etcd.option.OptionUtil;
+import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.model.ApplicationModel;
+
+import com.google.gson.Gson;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+/**
+ * 2019-07-08
+ */
+public class EtcdServiceDiscovery extends AbstractServiceDiscovery {
+
+    private static final Logger logger = LoggerFactory.getLogger(EtcdServiceDiscovery.class);
+
+    private final String root = "/services";
+
+    private final Set<String> services = new ConcurrentHashSet<>();
+    private final Map<String, InstanceChildListener> childListenerMap = new ConcurrentHashMap<>();
+
+    EtcdClient etcdClient;
+
+    public EtcdServiceDiscovery(ApplicationModel applicationModel, URL registryURL) {
+        super(applicationModel, registryURL);
+        EtcdTransporter etcdTransporter = applicationModel.getExtensionLoader(EtcdTransporter.class).getAdaptiveExtension();
+
+        if (registryURL.isAnyHost()) {
+            throw new IllegalStateException("Service discovery address is invalid, actual: '" + registryURL.getHost() + "'");
+        }
+
+        etcdClient = etcdTransporter.connect(registryURL);
+
+        etcdClient.addStateListener(state -> {
+            if (state == StateListener.CONNECTED) {
+                try {
+                    recover();
+                } catch (Exception e) {
+                    logger.error(e.getMessage(), e);
+                }
+            }
+        });
+
+        this.registryURL = registryURL;
+    }
+
+    @Override
+    protected void doDestroy() throws Exception {
+        if (etcdClient != null && etcdClient.isConnected()) {
+            etcdClient.close();
+        }
+    }
+
+    @Override
+    public void doRegister(ServiceInstance serviceInstance) {
+        try {
+            String path = toPath(serviceInstance);
+            etcdClient.putEphemeral(path, new Gson().toJson(serviceInstance));
+            services.add(serviceInstance.getServiceName());
+        } catch (Throwable e) {
+            throw new RpcException("Failed to register " + serviceInstance + " to etcd " + etcdClient.getUrl()
+                + ", cause: " + (OptionUtil.isProtocolError(e)
+                ? "etcd3 registry may not be supported yet or etcd3 registry is not available."
+                : e.getMessage()), e);
+        }
+    }
+
+    protected String toPath(ServiceInstance serviceInstance) {
+        return root + File.separator + serviceInstance.getServiceName() + File.separator + serviceInstance.getHost()
+            + ":" + serviceInstance.getPort();
+    }
+
+    protected String toParentPath(String serviceName) {
+        return root + File.separator + serviceName;
+    }
+
+    @Override
+    protected void doUnregister(ServiceInstance serviceInstance) {
+        try {
+            String path = toPath(serviceInstance);
+            etcdClient.delete(path);
+            services.remove(serviceInstance.getServiceName());
+        } catch (Throwable e) {
+            throw new RpcException("Failed to unregister " + serviceInstance + " to etcd " + etcdClient.getUrl() + ", cause: " + e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public Set<String> getServices() {
+        return Collections.unmodifiableSet(services);
+    }
+
+    @Override
+    public void addServiceInstancesChangedListener(ServiceInstancesChangedListener listener) throws NullPointerException, IllegalArgumentException {
+        for (String serviceName : listener.getServiceNames()) {
+            registerServiceWatcher(serviceName, listener);
+        }
+    }
+
+    @Override
+    public List<ServiceInstance> getInstances(String serviceName) {
+        List<String> children = etcdClient.getChildren(toParentPath(serviceName));
+        if (CollectionUtils.isEmpty(children)) {
+            return Collections.emptyList();
+        }
+        List<ServiceInstance> list = new ArrayList<>(children.size());
+        for (String child : children) {
+            ServiceInstance serviceInstance = new Gson().fromJson(etcdClient.getKVValue(child), DefaultServiceInstance.class);
+            list.add(serviceInstance);
+        }
+        return list;
+    }
+
+    protected void registerServiceWatcher(String serviceName, ServiceInstancesChangedListener listener) {
+        String path = root + File.separator + serviceName;
+
+        InstanceChildListener childListener = childListenerMap.get(serviceName);
+
+        if (childListener == null) {
+            childListener = new InstanceChildListener(serviceName);
+            childListenerMap.put(serviceName, childListener);
+            childListener.addListener(listener);
+
+            etcdClient.create(path);
+            etcdClient.addChildListener(path, childListener);
+        } else {
+            childListener.addListener(listener);
+        }
+    }
+
+    @Override
+    public void removeServiceInstancesChangedListener(ServiceInstancesChangedListener listener) throws IllegalArgumentException {
+        for (String serviceName : listener.getServiceNames()) {
+            unregisterServiceWatcher(serviceName, listener);
+        }
+    }
+
+    protected void unregisterServiceWatcher(String serviceName, ServiceInstancesChangedListener listener) {
+        String path = root + File.separator + serviceName;
+
+        InstanceChildListener childListener = childListenerMap.get(serviceName);
+
+        if (childListener != null) {
+            childListener.removeListener(listener);
+
+            if (childListener.getListenerCount() == 0) {
+                etcdClient.removeChildListener(path, childListener);
+            }
+        }
+    }
+
+    public class InstanceChildListener implements ChildListener {
+        private final List<ServiceInstancesChangedListener> listeners;
+
+        private final String serviceName;
+
+        public InstanceChildListener(String serviceName) {
+            this.serviceName = serviceName;
+            this.listeners = new CopyOnWriteArrayList<>();
+        }
+
+        @Override
+        public void childChanged(String path, List<String> children) {
+            List<ServiceInstance> list = new ArrayList<>(children.size());
+            for (String child : children) {
+                ServiceInstance serviceInstance = new Gson().fromJson(etcdClient.getKVValue(child), DefaultServiceInstance.class);
+                list.add(serviceInstance);
+            }
+
+            for (ServiceInstancesChangedListener listener : listeners) {
+                listener.onEvent(new ServiceInstancesChangedEvent(serviceName, list));
+            }
+        }
+
+        public void addListener(ServiceInstancesChangedListener listener) {
+            listeners.add(listener);
+        }
+
+        public void removeListener(ServiceInstancesChangedListener listener) {
+            listeners.remove(listener);
+        }
+
+        public int getListenerCount() {
+            return listeners.size();
+        }
+    }
+
+    private void recover() throws Exception {
+        // register
+        if (serviceInstance != null) {
+            if (logger.isInfoEnabled()) {
+                logger.info("Recover application register: " + serviceInstance);
+            }
+            doRegister(serviceInstance);
+        }
+    }
+
+    @Override
+    public URL getUrl() {
+        return registryURL;
+    }
+}
diff --git a/dubbo-registry-extensions/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdServiceDiscoveryFactory.java b/dubbo-registry-extensions/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdServiceDiscoveryFactory.java
new file mode 100644
index 0000000..8a0e0c2
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdServiceDiscoveryFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.etcd;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.registry.client.AbstractServiceDiscoveryFactory;
+import org.apache.dubbo.registry.client.ServiceDiscovery;
+
+public class EtcdServiceDiscoveryFactory extends AbstractServiceDiscoveryFactory {
+
+    @Override
+    protected ServiceDiscovery createDiscovery(URL registryURL) {
+        return new EtcdServiceDiscovery(applicationModel, registryURL);
+    }
+
+}
diff --git a/dubbo-registry-extensions/dubbo-registry-etcd3/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.RegistryFactory b/dubbo-registry-extensions/dubbo-registry-etcd3/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.RegistryFactory
new file mode 100644
index 0000000..4a6d09c
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-etcd3/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.RegistryFactory
@@ -0,0 +1 @@
+etcd3=org.apache.dubbo.registry.etcd.EtcdRegistryFactory
\ No newline at end of file
diff --git a/dubbo-registry-extensions/dubbo-registry-etcd3/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.ServiceDiscoveryFactory b/dubbo-registry-extensions/dubbo-registry-etcd3/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.ServiceDiscoveryFactory
new file mode 100644
index 0000000..2bd11aa
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-etcd3/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.ServiceDiscoveryFactory
@@ -0,0 +1 @@
+etcd=org.apache.dubbo.registry.etcd.EtcdServiceDiscoveryFactory
diff --git a/dubbo-registry-extensions/dubbo-registry-etcd3/src/test/java/org/apache/dubbo/registry/etcd/EtcdRegistryTest.java b/dubbo-registry-extensions/dubbo-registry-etcd3/src/test/java/org/apache/dubbo/registry/etcd/EtcdRegistryTest.java
new file mode 100644
index 0000000..82e32a5
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-etcd3/src/test/java/org/apache/dubbo/registry/etcd/EtcdRegistryTest.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.etcd;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.extension.ExtensionLoader;
+import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.registry.NotifyListener;
+import org.apache.dubbo.registry.RegistryFactory;
+import org.apache.dubbo.registry.support.RegistryManager;
+import org.apache.dubbo.remoting.Constants;
+import org.apache.dubbo.rpc.model.ApplicationModel;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.dubbo.common.constants.CommonConstants.ANY_VALUE;
+import static org.apache.dubbo.common.constants.CommonConstants.CLASSIFIER_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.ENABLED_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
+import static org.apache.dubbo.common.constants.RegistryConstants.CATEGORY_KEY;
+import static org.apache.dubbo.common.constants.RegistryConstants.CONFIGURATORS_CATEGORY;
+import static org.apache.dubbo.common.constants.RegistryConstants.CONSUMERS_CATEGORY;
+import static org.apache.dubbo.common.constants.RegistryConstants.PROVIDERS_CATEGORY;
+import static org.apache.dubbo.common.constants.RegistryConstants.ROUTERS_CATEGORY;
+import static org.apache.dubbo.registry.Constants.ADMIN_PROTOCOL;
+
+@Disabled
+public class EtcdRegistryTest {
+
+    String service = "org.apache.dubbo.internal.test.DemoServie";
+    String outerService = "org.apache.dubbo.outer.test.OuterDemoServie";
+    URL serviceUrl = URL.valueOf("dubbo://" + NetUtils.getLocalHost() + "/" + service + "?methods=test1,test2");
+    URL serviceUrl2 = URL.valueOf("dubbo://" + NetUtils.getLocalHost() + "/" + service + "?methods=test1,test2,test3");
+    URL serviceUrl3 = URL.valueOf("dubbo://" + NetUtils.getLocalHost() + "/" + outerService + "?methods=test1,test2");
+    URL registryUrl = URL.valueOf("etcd3://127.0.0.1:2379/org.apache.dubbo.registry.RegistryService");
+    URL consumerUrl = URL.valueOf("dubbo://" + NetUtils.getLocalHost() + ":2018" + "/" + service + "?methods=test1,test2");
+    RegistryFactory registryFactory = ExtensionLoader.getExtensionLoader(RegistryFactory.class).getExtension("etcd3", false);
+    EtcdRegistry registry;
+    URL subscribe = new URL(
+            ADMIN_PROTOCOL, NetUtils.getLocalHost(), 0, "",
+            INTERFACE_KEY, ANY_VALUE,
+            GROUP_KEY, ANY_VALUE,
+            VERSION_KEY, ANY_VALUE,
+            CLASSIFIER_KEY, ANY_VALUE,
+            CATEGORY_KEY, PROVIDERS_CATEGORY + "," + CONSUMERS_CATEGORY + "," + ROUTERS_CATEGORY + "," + CONFIGURATORS_CATEGORY,
+            ENABLED_KEY, ANY_VALUE,
+            Constants.CHECK_KEY, String.valueOf(false));
+
+    @Test
+    public void test_register() {
+
+        registry.register(serviceUrl);
+        Set<URL> registered = registry.getRegistered();
+        Assertions.assertEquals(1, registered.size());
+        Assertions.assertTrue(registered.contains(serviceUrl));
+
+        registry.unregister(serviceUrl);
+    }
+
+    @Test
+    public void test_unregister() {
+
+        registry.register(serviceUrl);
+        Set<URL> registered = registry.getRegistered();
+        Assertions.assertEquals(1, registered.size());
+        Assertions.assertTrue(registered.contains(serviceUrl));
+
+        registry.unregister(serviceUrl);
+
+        registered = registry.getRegistered();
+        Assertions.assertEquals(0, registered.size());
+    }
+
+    @Test
+    public void test_subscribe() {
+
+        registry.register(serviceUrl);
+
+        final AtomicReference<URL> notifiedUrl = new AtomicReference<URL>();
+        registry.subscribe(consumerUrl, new NotifyListener() {
+            public void notify(List<URL> urls) {
+                notifiedUrl.set(urls.get(0));
+            }
+        });
+        Assertions.assertEquals(serviceUrl.toFullString(), notifiedUrl.get().toFullString());
+        Map<URL, Set<NotifyListener>> arg = registry.getSubscribed();
+        Assertions.assertEquals(consumerUrl, arg.keySet().iterator().next());
+    }
+
+    @Test
+    public void test_subscribe_when_register() throws InterruptedException {
+
+        Assertions.assertEquals(0, registry.getRegistered().size());
+        Assertions.assertEquals(0, registry.getSubscribed().size());
+
+        CountDownLatch notNotified = new CountDownLatch(2);
+
+        final AtomicReference<URL> notifiedUrl = new AtomicReference<URL>();
+        registry.subscribe(consumerUrl, new NotifyListener() {
+            public void notify(List<URL> urls) {
+                notifiedUrl.set(urls.get(0));
+                notNotified.countDown();
+            }
+        });
+
+        registry.register(serviceUrl);
+
+        Assertions.assertTrue(notNotified.await(15, TimeUnit.SECONDS));
+
+        Assertions.assertEquals(serviceUrl.toFullString(), notifiedUrl.get().toFullString());
+        Map<URL, Set<NotifyListener>> subscribed = registry.getSubscribed();
+        Assertions.assertEquals(consumerUrl, subscribed.keySet().iterator().next());
+    }
+
+    @Test
+    public void test_subscribe_when_register0() throws InterruptedException {
+
+        Assertions.assertEquals(0, registry.getRegistered().size());
+        Assertions.assertEquals(0, registry.getSubscribed().size());
+
+        CountDownLatch notNotified = new CountDownLatch(3);
+        ConcurrentHashMap<URL, Boolean> notifiedUrls = new ConcurrentHashMap<>();
+        registry.subscribe(consumerUrl, new NotifyListener() {
+            public void notify(List<URL> urls) {
+                if (urls != null && urls.size() > 0) {
+                    if (!urls.get(0).getProtocol().equals("empty")) {
+                        for (Iterator<URL> iterator = urls.iterator(); iterator.hasNext(); ) {
+                            notifiedUrls.put(iterator.next(), true);
+                        }
+                    }
+                }
+
+                notNotified.countDown();
+            }
+        });
+
+        registry.register(serviceUrl);
+        registry.register(serviceUrl2);
+
+        Assertions.assertTrue(notNotified.await(15, TimeUnit.SECONDS));
+
+        Assertions.assertTrue(notifiedUrls.containsKey(serviceUrl));
+        Assertions.assertTrue(notifiedUrls.containsKey(serviceUrl2));
+        Map<URL, Set<NotifyListener>> subscribed = registry.getSubscribed();
+        Assertions.assertEquals(consumerUrl, subscribed.keySet().iterator().next());
+    }
+
+    @Test
+    public void test_subscribe_when_register1() throws InterruptedException {
+
+        Assertions.assertEquals(0, registry.getRegistered().size());
+        Assertions.assertEquals(0, registry.getSubscribed().size());
+
+        CountDownLatch notNotified = new CountDownLatch(2);
+
+        final AtomicReference<URL> notifiedUrls = new AtomicReference<URL>();
+        registry.subscribe(consumerUrl, new NotifyListener() {
+            public void notify(List<URL> urls) {
+                notifiedUrls.set(urls.get(0));
+                notNotified.countDown();
+            }
+        });
+
+        registry.register(serviceUrl);
+        // register service3 should not trigger notify
+        registry.register(serviceUrl3);
+
+        Assertions.assertTrue(notNotified.await(15, TimeUnit.SECONDS));
+
+        Assertions.assertEquals(serviceUrl, notifiedUrls.get());
+        Map<URL, Set<NotifyListener>> subscribed = registry.getSubscribed();
+        Assertions.assertEquals(consumerUrl, subscribed.keySet().iterator().next());
+    }
+
+    @Test
+    public void test_subscribe_when_register2() throws InterruptedException {
+
+        Assertions.assertEquals(0, registry.getRegistered().size());
+        Assertions.assertEquals(0, registry.getSubscribed().size());
+
+        CountDownLatch notNotified = new CountDownLatch(3);
+
+        ConcurrentHashMap<URL, Boolean> notifiedUrls = new ConcurrentHashMap<>();
+
+        registry.subscribe(subscribe, new NotifyListener() {
+            public void notify(List<URL> urls) {
+                if (urls != null && urls.size() > 0) {
+                    if (!urls.get(0).getProtocol().equals("empty")) {
+                        for (Iterator<URL> iterator = urls.iterator(); iterator.hasNext(); ) {
+                            notifiedUrls.put(iterator.next(), true);
+                        }
+                        notNotified.countDown();
+                    }
+                }
+            }
+        });
+
+        registry.register(serviceUrl);
+        registry.register(serviceUrl2);
+        // service3 interface is not equals server2
+        registry.register(serviceUrl3);
+
+        Assertions.assertTrue(notNotified.await(15, TimeUnit.SECONDS));
+        Assertions.assertEquals(3, notifiedUrls.size());
+        Assertions.assertTrue(notifiedUrls.containsKey(serviceUrl));
+        Assertions.assertTrue(notifiedUrls.containsKey(serviceUrl2));
+        Assertions.assertTrue(notifiedUrls.containsKey(serviceUrl3));
+    }
+
+    @Test
+    public void test_unsubscribe() throws InterruptedException {
+
+        Assertions.assertEquals(0, registry.getRegistered().size());
+        Assertions.assertEquals(0, registry.getSubscribed().size());
+
+        CountDownLatch notNotified = new CountDownLatch(2);
+
+        final AtomicReference<URL> notifiedUrl = new AtomicReference<URL>();
+
+        NotifyListener listener = new NotifyListener() {
+            public void notify(List<URL> urls) {
+                if (urls != null) {
+                    for (Iterator<URL> iterator = urls.iterator(); iterator.hasNext(); ) {
+                        URL url = iterator.next();
+                        if (!url.getProtocol().equals("empty")) {
+                            notifiedUrl.set(url);
+                            notNotified.countDown();
+                        }
+                    }
+                }
+            }
+        };
+        registry.subscribe(consumerUrl, listener);
+        registry.unsubscribe(consumerUrl, listener);
+
+        registry.register(serviceUrl);
+
+        Assertions.assertFalse(notNotified.await(2, TimeUnit.SECONDS));
+        // expect nothing happen
+        Assertions.assertNull(notifiedUrl.get());
+    }
+
+    @BeforeEach
+    public void setUp() {
+        registry = (EtcdRegistry) registryFactory.getRegistry(registryUrl);
+        Assertions.assertNotNull(registry);
+        if (!registry.isAvailable()) {
+            RegistryManager registryManager = ApplicationModel.defaultModel().getBeanFactory().getBean(RegistryManager.class);
+            registryManager.destroyAll();
+            registry = (EtcdRegistry) registryFactory.getRegistry(registryUrl);
+        }
+    }
+
+    @AfterEach
+    public void tearDown() {
+
+        registry.unregister(serviceUrl);
+        registry.unregister(serviceUrl2);
+        registry.unregister(serviceUrl3);
+        registry.unregister(subscribe);
+
+        registry.destroy();
+    }
+
+
+}
diff --git a/dubbo-registry-extensions/dubbo-registry-etcd3/src/test/java/org/apache/dubbo/registry/etcd/EtcdServiceDiscoveryTest.java b/dubbo-registry-extensions/dubbo-registry-etcd3/src/test/java/org/apache/dubbo/registry/etcd/EtcdServiceDiscoveryTest.java
new file mode 100644
index 0000000..50f66e6
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-etcd3/src/test/java/org/apache/dubbo/registry/etcd/EtcdServiceDiscoveryTest.java
@@ -0,0 +1,124 @@
+///*
+// * Licensed to the Apache Software Foundation (ASF) under one or more
+// * contributor license agreements.  See the NOTICE file distributed with
+// * this work for additional information regarding copyright ownership.
+// * The ASF licenses this file to You under the Apache License, Version 2.0
+// * (the "License"); you may not use this file except in compliance with
+// * the License.  You may obtain a copy of the License at
+// *
+// *     http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// */
+//package org.apache.dubbo.registry.etcd;
+//
+//import org.apache.dubbo.common.URL;
+//import org.apache.dubbo.registry.client.DefaultServiceInstance;
+//import org.apache.dubbo.registry.client.ServiceInstance;
+//
+//import com.google.gson.Gson;
+//import org.junit.jupiter.api.AfterAll;
+//import org.junit.jupiter.api.Assertions;
+//import org.junit.jupiter.api.BeforeAll;
+//import org.junit.jupiter.api.Disabled;
+//import org.junit.jupiter.api.Test;
+//
+//import java.util.ArrayList;
+//import java.util.List;
+//
+//import static java.lang.String.valueOf;
+//
+///**
+// * 2019-08-30
+// * <p>
+// * There is no embedded server. so it works depend on etcd local server.
+// */
+//@Disabled
+//public class EtcdServiceDiscoveryTest {
+//
+//    static EtcdServiceDiscovery etcdServiceDiscovery;
+//
+//    @BeforeAll
+//    public static void setUp() throws Exception {
+//        URL url = URL.valueOf("etcd3://127.0.0.1:2379/org.apache.dubbo.registry.RegistryService");
+//        etcdServiceDiscovery = new EtcdServiceDiscovery();
+//        Assertions.assertNull(etcdServiceDiscovery.etcdClient);
+//        etcdServiceDiscovery.initialize(url);
+//    }
+//
+//    @AfterAll
+//    public static void destroy() throws Exception {
+////        etcdServiceDiscovery.destroy();
+//    }
+//
+//
+//    @Test
+//    public void testLifecycle() throws Exception {
+//        URL url = URL.valueOf("etcd3://127.0.0.1:2233/org.apache.dubbo.registry.RegistryService");
+//        EtcdServiceDiscovery etcdServiceDiscoveryTmp = new EtcdServiceDiscovery();
+//        Assertions.assertNull(etcdServiceDiscoveryTmp.etcdClient);
+//        etcdServiceDiscoveryTmp.initialize(url);
+//        Assertions.assertNotNull(etcdServiceDiscoveryTmp.etcdClient);
+//        Assertions.assertTrue(etcdServiceDiscoveryTmp.etcdClient.isConnected());
+//        etcdServiceDiscoveryTmp.destroy();
+//        Assertions.assertFalse(etcdServiceDiscoveryTmp.etcdClient.isConnected());
+//    }
+//
+//    @Test
+//    public void testRegistry() throws Exception {
+//        ServiceInstance serviceInstance = new DefaultServiceInstance(valueOf(System.nanoTime()), "EtcdTestService", "127.0.0.1", 8080);
+//        Assertions.assertNull(etcdServiceDiscovery.etcdClient.getKVValue(etcdServiceDiscovery.toPath(serviceInstance)));
+//        etcdServiceDiscovery.register(serviceInstance);
+//        Assertions.assertNotNull(etcdServiceDiscovery.etcdClient.getKVValue(etcdServiceDiscovery.toPath(serviceInstance)));
+//    }
+//
+//    @Test
+//    public void testUnRegistry() throws Exception {
+//        ServiceInstance serviceInstance = new DefaultServiceInstance(valueOf(System.nanoTime()), "EtcdTest2Service", "127.0.0.1", 8080);
+//        Assertions.assertNull(etcdServiceDiscovery.etcdClient.getKVValue(etcdServiceDiscovery.toPath(serviceInstance)));
+//        etcdServiceDiscovery.register(serviceInstance);
+//        Assertions.assertNotNull(etcdServiceDiscovery.etcdClient.getKVValue(etcdServiceDiscovery.toPath(serviceInstance)));
+//        etcdServiceDiscovery.unregister(serviceInstance);
+//        Assertions.assertNull(etcdServiceDiscovery.etcdClient.getKVValue(etcdServiceDiscovery.toPath(serviceInstance)));
+//    }
+//
+//    @Test
+//    public void testUpdate() throws Exception {
+//        DefaultServiceInstance serviceInstance = new DefaultServiceInstance(valueOf(System.nanoTime()), "EtcdTest34Service", "127.0.0.1", 8080);
+//        Assertions.assertNull(etcdServiceDiscovery.etcdClient.getKVValue(etcdServiceDiscovery.toPath(serviceInstance)));
+//        etcdServiceDiscovery.register(serviceInstance);
+//        Assertions.assertNotNull(etcdServiceDiscovery.etcdClient.getKVValue(etcdServiceDiscovery.toPath(serviceInstance)));
+//        Assertions.assertEquals(etcdServiceDiscovery.etcdClient.getKVValue(etcdServiceDiscovery.toPath(serviceInstance)),
+//                new Gson().toJson(serviceInstance));
+//        serviceInstance.setPort(9999);
+//        etcdServiceDiscovery.update(serviceInstance);
+//        Assertions.assertNotNull(etcdServiceDiscovery.etcdClient.getKVValue(etcdServiceDiscovery.toPath(serviceInstance)));
+//        Assertions.assertEquals(etcdServiceDiscovery.etcdClient.getKVValue(etcdServiceDiscovery.toPath(serviceInstance)),
+//                new Gson().toJson(serviceInstance));
+//    }
+//
+//    @Test
+//    public void testGetInstances() throws Exception {
+//        String serviceName = "EtcdTest77Service";
+//        Assertions.assertTrue(etcdServiceDiscovery.getInstances(serviceName).isEmpty());
+//        etcdServiceDiscovery.register(new DefaultServiceInstance(valueOf(System.nanoTime()), serviceName, "127.0.0.1", 8080));
+//        etcdServiceDiscovery.register(new DefaultServiceInstance(valueOf(System.nanoTime()), serviceName, "127.0.0.1", 9809));
+//        Assertions.assertFalse(etcdServiceDiscovery.getInstances(serviceName).isEmpty());
+//        List<String> r = convertToIpPort(etcdServiceDiscovery.getInstances(serviceName));
+//        Assertions.assertTrue(r.contains("127.0.0.1:8080"));
+//        Assertions.assertTrue(r.contains("127.0.0.1:9809"));
+//    }
+//
+//    private List<String> convertToIpPort(List<ServiceInstance> serviceInstances) {
+//        List<String> result = new ArrayList<>();
+//        for (ServiceInstance serviceInstance : serviceInstances) {
+//            result.add(serviceInstance.getHost() + ":" + serviceInstance.getPort());
+//        }
+//        return result;
+//    }
+//
+//}
diff --git a/dubbo-registry-extensions/dubbo-registry-redis/pom.xml b/dubbo-registry-extensions/dubbo-registry-redis/pom.xml
new file mode 100644
index 0000000..665f6aa
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-redis/pom.xml
@@ -0,0 +1,59 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+    <parent>
+        <artifactId>dubbo-registry-extensions</artifactId>
+        <groupId>org.apache.dubbo.extensions</groupId>
+        <version>${revision}</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <version>1.0.0-SNAPSHOT</version>
+    <artifactId>dubbo-registry-redis</artifactId>
+    <packaging>jar</packaging>
+    <name>${project.artifactId}</name>
+    <description>The redis registry module of dubbo project</description>
+    <properties>
+        <skip_maven_deploy>false</skip_maven_deploy>
+    </properties>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.dubbo</groupId>
+            <artifactId>dubbo-registry-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.dubbo.extensions</groupId>
+            <artifactId>dubbo-remoting-redis</artifactId>
+            <version>1.0.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>redis.clients</groupId>
+            <artifactId>jedis</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.github.codemonstur</groupId>
+            <artifactId>embedded-redis</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/dubbo-registry-extensions/dubbo-registry-redis/src/main/java/org/apache/dubbo/registry/redis/RedisRegistry.java b/dubbo-registry-extensions/dubbo-registry-redis/src/main/java/org/apache/dubbo/registry/redis/RedisRegistry.java
new file mode 100644
index 0000000..a65d6a7
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-redis/src/main/java/org/apache/dubbo/registry/redis/RedisRegistry.java
@@ -0,0 +1,519 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.redis;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.URLBuilder;
+import org.apache.dubbo.common.utils.CollectionUtils;
+import org.apache.dubbo.common.utils.ExecutorUtil;
+import org.apache.dubbo.common.utils.NamedThreadFactory;
+import org.apache.dubbo.common.utils.UrlUtils;
+import org.apache.dubbo.registry.NotifyListener;
+import org.apache.dubbo.registry.support.FailbackRegistry;
+import org.apache.dubbo.remoting.redis.RedisClient;
+import org.apache.dubbo.remoting.redis.jedis.ClusterRedisClient;
+import org.apache.dubbo.remoting.redis.jedis.MonoRedisClient;
+import org.apache.dubbo.remoting.redis.jedis.SentinelRedisClient;
+import org.apache.dubbo.rpc.RpcException;
+
+import redis.clients.jedis.JedisPubSub;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.dubbo.common.constants.CommonConstants.ANYHOST_VALUE;
+import static org.apache.dubbo.common.constants.CommonConstants.ANY_VALUE;
+import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.PATH_SEPARATOR;
+import static org.apache.dubbo.common.constants.RegistryConstants.CATEGORY_KEY;
+import static org.apache.dubbo.common.constants.RegistryConstants.DEFAULT_CATEGORY;
+import static org.apache.dubbo.common.constants.RegistryConstants.DYNAMIC_KEY;
+import static org.apache.dubbo.common.constants.RegistryConstants.EMPTY_PROTOCOL;
+import static org.apache.dubbo.registry.Constants.DEFAULT_REGISTRY_RECONNECT_PERIOD;
+import static org.apache.dubbo.registry.Constants.DEFAULT_SESSION_TIMEOUT;
+import static org.apache.dubbo.registry.Constants.REGISTER;
+import static org.apache.dubbo.registry.Constants.REGISTRY_RECONNECT_PERIOD_KEY;
+import static org.apache.dubbo.registry.Constants.SESSION_TIMEOUT_KEY;
+import static org.apache.dubbo.registry.Constants.UNREGISTER;
+
+/**
+ * RedisRegistry
+ */
+public class RedisRegistry extends FailbackRegistry {
+
+    private static final String REDIS_CLIENT_KEY = "redis-client";
+
+    private static final String MONO_REDIS = "mono";
+
+    private static final String SENTINEL_REDIS = "sentinel";
+
+    private static final String CLUSTER_REDIS = "cluster";
+    private static final String DEFAULT_ROOT = "dubbo";
+
+    private final ScheduledExecutorService expireExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("DubboRegistryExpireTimer", true));
+
+    private final ScheduledFuture<?> expireFuture;
+
+    private final String root;
+
+    private RedisClient redisClient;
+
+    private final ConcurrentMap<String, Notifier> notifiers = new ConcurrentHashMap<>();
+
+    private final int reconnectPeriod;
+
+    private final int expirePeriod;
+
+    private volatile boolean admin = false;
+
+    private final Map<URL, Long> expireCache = new ConcurrentHashMap<>();
+
+    // just for unit test
+    private volatile boolean doExpire = true;
+
+    public RedisRegistry(URL url) {
+        super(url);
+        String type = url.getParameter(REDIS_CLIENT_KEY, MONO_REDIS);
+        if (SENTINEL_REDIS.equals(type)) {
+            redisClient = new SentinelRedisClient(url);
+        } else if (CLUSTER_REDIS.equals(type)) {
+            redisClient = new ClusterRedisClient(url);
+        } else {
+            redisClient = new MonoRedisClient(url);
+        }
+
+        if (url.isAnyHost()) {
+            throw new IllegalStateException("registry address == null");
+        }
+
+        this.reconnectPeriod = url.getParameter(REGISTRY_RECONNECT_PERIOD_KEY, DEFAULT_REGISTRY_RECONNECT_PERIOD);
+        String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT);
+        if (!group.startsWith(PATH_SEPARATOR)) {
+            group = PATH_SEPARATOR + group;
+        }
+        if (!group.endsWith(PATH_SEPARATOR)) {
+            group = group + PATH_SEPARATOR;
+        }
+        this.root = group;
+
+        this.expirePeriod = url.getParameter(SESSION_TIMEOUT_KEY, DEFAULT_SESSION_TIMEOUT);
+        this.expireFuture = expireExecutor.scheduleWithFixedDelay(() -> {
+            try {
+                deferExpired(); // Extend the expiration time
+            } catch (Throwable t) { // Defensive fault tolerance
+                logger.error("Unexpected exception occur at defer expire time, cause: " + t.getMessage(), t);
+            }
+        }, expirePeriod / 2, expirePeriod / 2, TimeUnit.MILLISECONDS);
+    }
+
+    private void deferExpired() {
+        for (URL url : new HashSet<>(getRegistered())) {
+            if (url.getParameter(DYNAMIC_KEY, true)) {
+                String key = toCategoryPath(url);
+                if (redisClient.hset(key, url.toFullString(), String.valueOf(System.currentTimeMillis() + expirePeriod)) == 1) {
+                    redisClient.publish(key, REGISTER);
+                }
+            }
+        }
+
+        if (doExpire) {
+            for (Map.Entry<URL, Long> expireEntry : expireCache.entrySet()) {
+                if (expireEntry.getValue() < System.currentTimeMillis()) {
+                    doNotify(toCategoryPath(expireEntry.getKey()));
+                }
+            }
+        }
+
+        if (admin) {
+            clean();
+        }
+    }
+
+    private void clean() {
+        Set<String> keys = redisClient.scan(root + ANY_VALUE);
+        if (CollectionUtils.isNotEmpty(keys)) {
+            for (String key : keys) {
+                Map<String, String> values = redisClient.hgetAll(key);
+                if (CollectionUtils.isNotEmptyMap(values)) {
+                    boolean delete = false;
+                    long now = System.currentTimeMillis();
+                    for (Map.Entry<String, String> entry : values.entrySet()) {
+                        URL url = URL.valueOf(entry.getKey());
+                        if (url.getParameter(DYNAMIC_KEY, true)) {
+                            long expire = Long.parseLong(entry.getValue());
+                            if (expire < now) {
+                                redisClient.hdel(key, entry.getKey());
+                                delete = true;
+                                if (logger.isWarnEnabled()) {
+                                    logger.warn("Delete expired key: " + key + " -> value: " + entry.getKey() + ", expire: " + new Date(expire) + ", now: " + new Date(now));
+                                }
+                            }
+                        }
+                    }
+                    if (delete) {
+                        redisClient.publish(key, UNREGISTER);
+                    }
+                }
+            }
+        }
+    }
+
+    @Override
+    public boolean isAvailable() {
+        return redisClient.isConnected();
+    }
+
+    @Override
+    public void destroy() {
+        super.destroy();
+        try {
+            expireFuture.cancel(true);
+        } catch (Throwable t) {
+            logger.warn(t.getMessage(), t);
+        }
+        try {
+            for (Notifier notifier : notifiers.values()) {
+                notifier.shutdown();
+            }
+        } catch (Throwable t) {
+            logger.warn(t.getMessage(), t);
+        }
+        try {
+            redisClient.destroy();
+        } catch (Throwable t) {
+            logger.warn("Failed to destroy the redis registry client. registry: " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
+        }
+        ExecutorUtil.gracefulShutdown(expireExecutor, expirePeriod);
+    }
+
+    @Override
+    public void doRegister(URL url) {
+        String key = toCategoryPath(url);
+        String value = url.toFullString();
+        String expire = String.valueOf(System.currentTimeMillis() + expirePeriod);
+        try {
+            redisClient.hset(key, value, expire);
+            redisClient.publish(key, REGISTER);
+        } catch (Throwable t) {
+            throw new RpcException("Failed to register service to redis registry. registry: " + url.getAddress() + ", service: " + url + ", cause: " + t.getMessage(), t);
+        }
+    }
+
+    @Override
+    public void doUnregister(URL url) {
+        String key = toCategoryPath(url);
+        String value = url.toFullString();
+        try {
+            redisClient.hdel(key, value);
+            redisClient.publish(key, UNREGISTER);
+        } catch (Throwable t) {
+            throw new RpcException("Failed to unregister service to redis registry. registry: " + url.getAddress() + ", service: " + url + ", cause: " + t.getMessage(), t);
+        }
+    }
+
+    @Override
+    public void doSubscribe(final URL url, final NotifyListener listener) {
+        String service = toServicePath(url);
+        Notifier notifier = notifiers.get(service);
+        if (notifier == null) {
+            Notifier newNotifier = new Notifier(service);
+            notifiers.putIfAbsent(service, newNotifier);
+            notifier = notifiers.get(service);
+            if (notifier == newNotifier) {
+                notifier.start();
+            }
+        }
+        try {
+            if (service.endsWith(ANY_VALUE)) {
+                admin = true;
+                Set<String> keys = redisClient.scan(service);
+                if (CollectionUtils.isNotEmpty(keys)) {
+                    Map<String, Set<String>> serviceKeys = new HashMap<>();
+                    for (String key : keys) {
+                        String serviceKey = toServicePath(key);
+                        Set<String> sk = serviceKeys.computeIfAbsent(serviceKey, k -> new HashSet<>());
+                        sk.add(key);
+                    }
+                    for (Set<String> sk : serviceKeys.values()) {
+                        doNotify(sk, url, Collections.singletonList(listener));
+                    }
+                }
+            } else {
+                doNotify(redisClient.scan(service + PATH_SEPARATOR + ANY_VALUE), url, Collections.singletonList(listener));
+            }
+        } catch (Throwable t) {
+            throw new RpcException("Failed to subscribe service from redis registry. registry: " + url.getAddress() + ", service: " + url + ", cause: " + t.getMessage(), t);
+        }
+    }
+
+    @Override
+    public void doUnsubscribe(URL url, NotifyListener listener) {
+    }
+
+    private void doNotify(String key) {
+        for (Map.Entry<URL, Set<NotifyListener>> entry : new HashMap<>(getSubscribed()).entrySet()) {
+            doNotify(Collections.singletonList(key), entry.getKey(), new HashSet<>(entry.getValue()));
+        }
+    }
+
+    private void doNotify(Collection<String> keys, URL url, Collection<NotifyListener> listeners) {
+        if (keys == null || keys.isEmpty()
+                || listeners == null || listeners.isEmpty()) {
+            return;
+        }
+        long now = System.currentTimeMillis();
+        List<URL> result = new ArrayList<>();
+        List<String> categories = Arrays.asList(url.getParameter(CATEGORY_KEY, new String[0]));
+        String consumerService = url.getServiceInterface();
+        for (String key : keys) {
+            if (!ANY_VALUE.equals(consumerService)) {
+                String providerService = toServiceName(key);
+                if (!providerService.equals(consumerService)) {
+                    continue;
+                }
+            }
+            String category = toCategoryName(key);
+            if (!categories.contains(ANY_VALUE) && !categories.contains(category)) {
+                continue;
+            }
+            List<URL> urls = new ArrayList<>();
+            Set<URL> toDeleteExpireKeys = new HashSet<>(expireCache.keySet());
+            Map<String, String> values = redisClient.hgetAll(key);
+            if (CollectionUtils.isNotEmptyMap(values)) {
+                for (Map.Entry<String, String> entry : values.entrySet()) {
+                    URL u = URL.valueOf(entry.getKey());
+                    long expire = Long.parseLong(entry.getValue());
+                    if (!u.getParameter(DYNAMIC_KEY, true)
+                            || expire >= now) {
+                        if (UrlUtils.isMatch(url, u)) {
+                            urls.add(u);
+                            expireCache.put(u, expire);
+                            toDeleteExpireKeys.remove(u);
+                        }
+                    }
+                }
+            }
+
+            if (!toDeleteExpireKeys.isEmpty()) {
+                for (URL u : toDeleteExpireKeys) {
+                    expireCache.remove(u);
+                }
+            }
+            if (urls.isEmpty()) {
+                urls.add(URLBuilder.from(url)
+                        .setProtocol(EMPTY_PROTOCOL)
+                        .setAddress(ANYHOST_VALUE)
+                        .setPath(toServiceName(key))
+                        .addParameter(CATEGORY_KEY, category)
+                        .build());
+            }
+            result.addAll(urls);
+
+            if (logger.isInfoEnabled()) {
+                logger.info("redis notify: " + key + " = " + urls);
+            }
+        }
+        if (CollectionUtils.isEmpty(result)) {
+            return;
+        }
+        for (NotifyListener listener : listeners) {
+            notify(url, listener, result);
+        }
+    }
+
+    private String toServiceName(String categoryPath) {
+        String servicePath = toServicePath(categoryPath);
+        return servicePath.startsWith(root) ? servicePath.substring(root.length()) : servicePath;
+    }
+
+    private String toCategoryName(String categoryPath) {
+        int i = categoryPath.lastIndexOf(PATH_SEPARATOR);
+        return i > 0 ? categoryPath.substring(i + 1) : categoryPath;
+    }
+
+    private String toServicePath(String categoryPath) {
+        int i;
+        if (categoryPath.startsWith(root)) {
+            i = categoryPath.indexOf(PATH_SEPARATOR, root.length());
+        } else {
+            i = categoryPath.indexOf(PATH_SEPARATOR);
+        }
+        return i > 0 ? categoryPath.substring(0, i) : categoryPath;
+    }
+
+    private String toServicePath(URL url) {
+        return root + url.getServiceInterface();
+    }
+
+    private String toCategoryPath(URL url) {
+        return toServicePath(url) + PATH_SEPARATOR + url.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY);
+    }
+
+    private class NotifySub extends JedisPubSub {
+        public NotifySub() {}
+
+        @Override
+        public void onMessage(String key, String msg) {
+            if (logger.isInfoEnabled()) {
+                logger.info("redis event: " + key + " = " + msg);
+            }
+            if (msg.equals(REGISTER)
+                    || msg.equals(UNREGISTER)) {
+                try {
+                    doNotify(key);
+                } catch (Throwable t) { // TODO Notification failure does not restore mechanism guarantee
+                    logger.error(t.getMessage(), t);
+                }
+            }
+        }
+
+        @Override
+        public void onPMessage(String pattern, String key, String msg) {
+            onMessage(key, msg);
+        }
+
+        @Override
+        public void onSubscribe(String key, int num) {
+        }
+
+        @Override
+        public void onPSubscribe(String pattern, int num) {
+        }
+
+        @Override
+        public void onUnsubscribe(String key, int num) {
+        }
+
+        @Override
+        public void onPUnsubscribe(String pattern, int num) {
+        }
+
+    }
+
+    private class Notifier extends Thread {
+
+        private final String service;
+        private final AtomicInteger connectSkip = new AtomicInteger();
+        private final AtomicInteger connectSkipped = new AtomicInteger();
+
+        private volatile boolean first = true;
+        private volatile boolean running = true;
+        private volatile int connectRandom;
+
+        public Notifier(String service) {
+            super.setDaemon(true);
+            super.setName("DubboRedisSubscribe");
+            this.service = service;
+        }
+
+        private void resetSkip() {
+            connectSkip.set(0);
+            connectSkipped.set(0);
+            connectRandom = 0;
+        }
+
+        private boolean isSkip() {
+            int skip = connectSkip.get(); // Growth of skipping times
+            if (skip >= 10) { // If the number of skipping times increases by more than 10, take the random number
+                if (connectRandom == 0) {
+                    connectRandom = ThreadLocalRandom.current().nextInt(10);
+                }
+                skip = 10 + connectRandom;
+            }
+            if (connectSkipped.getAndIncrement() < skip) { // Check the number of skipping times
+                return true;
+            }
+            connectSkip.incrementAndGet();
+            connectSkipped.set(0);
+            connectRandom = 0;
+            return false;
+        }
+
+        @Override
+        public void run() {
+            while (running) {
+                try {
+                    if (!isSkip()) {
+                        try {
+                            if (!redisClient.isConnected()) {
+                                continue;
+                            }
+                            try {
+                                if (service.endsWith(ANY_VALUE)) {
+                                    if (first) {
+                                        first = false;
+                                        Set<String> keys = redisClient.scan(service);
+                                        if (CollectionUtils.isNotEmpty(keys)) {
+                                            for (String s : keys) {
+                                                doNotify(s);
+                                            }
+                                        }
+                                        resetSkip();
+                                    }
+                                    redisClient.psubscribe(new NotifySub(), service);
+                                } else {
+                                    if (first) {
+                                        first = false;
+                                        doNotify(service);
+                                        resetSkip();
+                                    }
+                                    redisClient.psubscribe(new NotifySub(), service + PATH_SEPARATOR + ANY_VALUE); // blocking
+                                }
+                            } catch (Throwable t) { // Retry another server
+                                logger.warn("Failed to subscribe service from redis registry. registry: " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
+                                // If you only have a single redis, you need to take a rest to avoid overtaking a lot of CPU resources
+                                sleep(reconnectPeriod);
+                            }
+                        } catch (Throwable t) {
+                            logger.error(t.getMessage(), t);
+                            sleep(reconnectPeriod);
+                        }
+                    }
+                } catch (Throwable t) {
+                    logger.error(t.getMessage(), t);
+                }
+            }
+        }
+
+        public void shutdown() {
+            try {
+                running = false;
+                redisClient.disconnect();
+            } catch (Throwable t) {
+                logger.warn(t.getMessage(), t);
+            }
+        }
+
+    }
+
+}
diff --git a/dubbo-registry-extensions/dubbo-registry-redis/src/main/java/org/apache/dubbo/registry/redis/RedisRegistryFactory.java b/dubbo-registry-extensions/dubbo-registry-redis/src/main/java/org/apache/dubbo/registry/redis/RedisRegistryFactory.java
new file mode 100644
index 0000000..ec1b5ea
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-redis/src/main/java/org/apache/dubbo/registry/redis/RedisRegistryFactory.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.redis;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.registry.Registry;
+import org.apache.dubbo.registry.support.AbstractRegistryFactory;
+
+/**
+ * RedisRegistryFactory
+ *
+ */
+public class RedisRegistryFactory extends AbstractRegistryFactory {
+
+    @Override
+    protected Registry createRegistry(URL url) {
+        return new RedisRegistry(url);
+    }
+
+}
diff --git a/dubbo-registry-extensions/dubbo-registry-redis/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.RegistryFactory b/dubbo-registry-extensions/dubbo-registry-redis/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.RegistryFactory
new file mode 100644
index 0000000..3214101
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-redis/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.RegistryFactory
@@ -0,0 +1 @@
+redis=org.apache.dubbo.registry.redis.RedisRegistryFactory
\ No newline at end of file
diff --git a/dubbo-registry-extensions/dubbo-registry-redis/src/test/java/org/apache/dubbo/registry/redis/RedisRegistryTest.java b/dubbo-registry-extensions/dubbo-registry-redis/src/test/java/org/apache/dubbo/registry/redis/RedisRegistryTest.java
new file mode 100644
index 0000000..303971d
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-redis/src/test/java/org/apache/dubbo/registry/redis/RedisRegistryTest.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.redis;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.registry.NotifyListener;
+import org.apache.dubbo.registry.Registry;
+import org.apache.dubbo.registry.support.AbstractRegistry;
+
+import org.apache.commons.lang3.SystemUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import redis.clients.jedis.exceptions.JedisConnectionException;
+import redis.embedded.RedisServer;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import static org.apache.dubbo.common.constants.RemotingConstants.BACKUP_KEY;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static redis.embedded.RedisServer.newRedisServer;
+
+public class RedisRegistryTest {
+
+    private static final String SERVICE = "org.apache.dubbo.test.injvmServie";
+    private static final URL SERVICE_URL = URL.valueOf("redis://redis/" + SERVICE + "?notify=false&methods=test1,test2&category=providers,configurators,routers");
+    private static final URL PROVIDER_URL_A = URL.valueOf("redis://127.0.0.1:20880/" + SERVICE + "?notify=false&methods=test1,test2");
+    private static final URL PROVIDER_URL_B = URL.valueOf("redis://127.0.0.1:20881/" + SERVICE + "?notify=false&methods=test1,test2");
+
+    private RedisServer redisServer;
+    private RedisRegistry redisRegistry;
+    private URL registryUrl;
+
+    @BeforeEach
+    public void setUp() throws Exception {
+        int redisPort = 0;
+        IOException exception = null;
+
+        for (int i = 0; i < 10; i++) {
+            try {
+                redisPort = NetUtils.getAvailablePort(30000 + new Random().nextInt(10000));
+                redisServer = newRedisServer()
+                        .port(redisPort)
+                        // set maxheap to fix Windows error 0x70 while starting redis
+                        .settingIf(SystemUtils.IS_OS_WINDOWS, "maxheap 128mb")
+                        .build();
+                this.redisServer.start();
+                exception = null;
+            } catch (IOException e) {
+                e.printStackTrace();
+                exception = e;
+            }
+            if (exception == null) {
+                break;
+            }
+        }
+        Assertions.assertNull(exception);
+        registryUrl = URL.valueOf("redis://localhost:" + redisPort + "?session=4000");
+        redisRegistry = (RedisRegistry) new RedisRegistryFactory().createRegistry(registryUrl);
+    }
+
+    @AfterEach
+    public void tearDown() throws Exception {
+        this.redisServer.stop();
+    }
+
+    @Test
+    public void testRegister() {
+        Set<URL> registered = null;
+
+        for (int i = 0; i < 2; i++) {
+            redisRegistry.register(SERVICE_URL);
+            registered = redisRegistry.getRegistered();
+            assertThat(registered.contains(SERVICE_URL), is(true));
+        }
+
+        registered = redisRegistry.getRegistered();
+        assertThat(registered.size(), is(1));
+    }
+
+    @Test
+    public void testAnyHost() {
+        assertThrows(IllegalStateException.class, () -> {
+            URL errorUrl = URL.valueOf("multicast://0.0.0.0/");
+            new RedisRegistryFactory().createRegistry(errorUrl);
+        });
+    }
+
+    @Test
+    public void testSubscribeExpireCache() throws Exception {
+        redisRegistry.register(PROVIDER_URL_A);
+        redisRegistry.register(PROVIDER_URL_B);
+
+        NotifyListener listener = new NotifyListener() {
+            @Override
+            public void notify(List<URL> urls) {
+            }
+        };
+
+        redisRegistry.subscribe(SERVICE_URL, listener);
+
+        Field expireCache = RedisRegistry.class.getDeclaredField("expireCache");
+        expireCache.setAccessible(true);
+        Map<URL, Long> cacheExpire = (Map<URL, Long>)expireCache.get(redisRegistry);
+
+        assertThat(cacheExpire.get(PROVIDER_URL_A) > 0, is(true));
+        assertThat(cacheExpire.get(PROVIDER_URL_B) > 0, is(true));
+
+        redisRegistry.unregister(PROVIDER_URL_A);
+
+        boolean success = false;
+
+        for (int i = 0; i < 30; i++) {
+            cacheExpire = (Map<URL, Long>)expireCache.get(redisRegistry);
+            if (cacheExpire.get(PROVIDER_URL_A) == null) {
+                success = true;
+                break;
+            }
+            Thread.sleep(500);
+        }
+        assertThat(success, is(true));
+    }
+
+    @Test
+    public void testSubscribeWhenProviderCrash() throws Exception {
+
+        // unit test will fail if doExpire=false
+        // Field doExpireField = RedisRegistry.class.getDeclaredField("doExpire");
+        // doExpireField.setAccessible(true);
+        // doExpireField.set(redisRegistry, false);
+
+        redisRegistry.register(PROVIDER_URL_A);
+        redisRegistry.register(PROVIDER_URL_B);
+        assertThat(redisRegistry.getRegistered().contains(PROVIDER_URL_A), is(true));
+        assertThat(redisRegistry.getRegistered().contains(PROVIDER_URL_B), is(true));
+
+        Set<URL> notifiedUrls = new HashSet<>();
+        Object lock = new Object();
+
+        NotifyListener listener = new NotifyListener() {
+            @Override
+            public void notify(List<URL> urls) {
+                synchronized (lock) {
+                    notifiedUrls.clear();
+                    notifiedUrls.addAll(urls);
+                }
+            }
+        };
+
+        redisRegistry.subscribe(SERVICE_URL, listener);
+        assertThat(redisRegistry.getSubscribed().size(), is(1));
+
+        boolean firstOk = false;
+        boolean secondOk = false;
+
+        for (int i = 0; i < 30; i++) {
+            synchronized (lock) {
+                if (notifiedUrls.contains(PROVIDER_URL_A) && notifiedUrls.contains(PROVIDER_URL_B)) {
+                    firstOk = true;
+                    break;
+                }
+            }
+            Thread.sleep(500);
+        }
+
+        assertThat(firstOk, is(true));
+
+        // kill -9 to providerB
+        Field registeredField = AbstractRegistry.class.getDeclaredField("registered");
+        registeredField.setAccessible(true);
+        ((Set<URL>) registeredField.get(redisRegistry)).remove(PROVIDER_URL_B);
+
+        for (int i = 0; i < 30; i++) {
+            synchronized (lock) {
+                if (notifiedUrls.contains(PROVIDER_URL_A) && notifiedUrls.size() == 1) {
+                    secondOk = true;
+                    break;
+                }
+            }
+            Thread.sleep(500);
+        }
+        assertThat(secondOk, is(true));
+    }
+
+    @Test
+    public void testSubscribeAndUnsubscribe() {
+        NotifyListener listener = new NotifyListener() {
+            @Override
+            public void notify(List<URL> urls) {
+
+            }
+        };
+        redisRegistry.subscribe(SERVICE_URL, listener);
+
+        Map<URL, Set<NotifyListener>> subscribed = redisRegistry.getSubscribed();
+        assertThat(subscribed.size(), is(1));
+        assertThat(subscribed.get(SERVICE_URL).size(), is(1));
+
+        redisRegistry.unsubscribe(SERVICE_URL, listener);
+        subscribed = redisRegistry.getSubscribed();
+        assertThat(subscribed.get(SERVICE_URL).size(), is(0));
+    }
+
+    @Test
+    public void testAvailable() {
+        redisRegistry.register(SERVICE_URL);
+        assertThat(redisRegistry.isAvailable(), is(true));
+
+        redisRegistry.destroy();
+        assertThrows(JedisConnectionException.class, () -> redisRegistry.isAvailable());
+    }
+
+    @Test
+    public void testAvailableWithBackup() {
+        URL url = URL.valueOf("redis://redisOne:8880").addParameter(BACKUP_KEY, "redisTwo:8881");
+        Registry registry = new RedisRegistryFactory().createRegistry(url);
+
+        Registry finalRegistry = registry;
+        assertThrows(JedisConnectionException.class, () -> finalRegistry.isAvailable());
+
+        url = URL.valueOf(this.registryUrl.toFullString()).addParameter(BACKUP_KEY, "redisTwo:8881");
+        registry = new RedisRegistryFactory().createRegistry(url);
+
+        assertThat(registry.isAvailable(), is(true));
+    }
+}
diff --git a/dubbo-registry-extensions/dubbo-registry-sofa/pom.xml b/dubbo-registry-extensions/dubbo-registry-sofa/pom.xml
new file mode 100644
index 0000000..4a4ec00
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-sofa/pom.xml
@@ -0,0 +1,130 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>dubbo-registry-extensions</artifactId>
+        <groupId>org.apache.dubbo.extensions</groupId>
+        <version>${revision}</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <version>1.0.0-SNAPSHOT</version>
+    <artifactId>dubbo-registry-sofa</artifactId>
+    <name>${project.artifactId}</name>
+    <description>The SOFARegistry module of Dubbo project</description>
+
+    <properties>
+        <javax.ws.rs.version>2.1</javax.ws.rs.version>
+        <argline>-Dnetwork_interface_denylist=docker0</argline>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.dubbo</groupId>
+            <artifactId>dubbo-registry-api</artifactId>
+            <optional>true</optional>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.dubbo</groupId>
+            <artifactId>dubbo-common</artifactId>
+            <optional>true</optional>
+        </dependency>
+
+        <dependency>
+            <groupId>com.alipay.sofa</groupId>
+            <artifactId>registry-client-all</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.alipay.sofa</groupId>
+                    <artifactId>sofa-common-tools</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.dubbo</groupId>
+            <artifactId>dubbo-rpc-dubbo</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.dubbo</groupId>
+            <artifactId>dubbo-remoting-netty4</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.dubbo</groupId>
+            <artifactId>dubbo-serialization-hessian2</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- test modules -->
+        <dependency>
+            <groupId>com.alipay.sofa</groupId>
+            <artifactId>registry-test</artifactId>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <artifactId>log4j-over-slf4j</artifactId>
+                    <groupId>org.slf4j</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>log4j-jcl</artifactId>
+                    <groupId>org.apache.logging.log4j</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>log4j-core</artifactId>
+                    <groupId>org.apache.logging.log4j</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>log4j-api</artifactId>
+                    <groupId>org.apache.logging.log4j</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.jboss.resteasy</groupId>
+            <artifactId>resteasy-jaxrs</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.jboss.resteasy</groupId>
+            <artifactId>resteasy-netty4</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>javax.ws.rs</groupId>
+            <artifactId>javax.ws.rs-api</artifactId>
+            <version>${javax.ws.rs.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>javax.xml.bind</groupId>
+            <artifactId>jaxb-api</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+    </dependencies>
+</project>
diff --git a/dubbo-registry-extensions/dubbo-registry-sofa/src/main/java/org/apache/dubbo/registry/sofa/SofaRegistry.java b/dubbo-registry-extensions/dubbo-registry-sofa/src/main/java/org/apache/dubbo/registry/sofa/SofaRegistry.java
new file mode 100644
index 0000000..60debfa
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-sofa/src/main/java/org/apache/dubbo/registry/sofa/SofaRegistry.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.sofa;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.utils.StringUtils;
+import org.apache.dubbo.registry.NotifyListener;
+import org.apache.dubbo.registry.support.FailbackRegistry;
+
+import com.alipay.sofa.registry.client.api.RegistryClient;
+import com.alipay.sofa.registry.client.api.RegistryClientConfig;
+import com.alipay.sofa.registry.client.api.Subscriber;
+import com.alipay.sofa.registry.client.api.model.RegistryType;
+import com.alipay.sofa.registry.client.api.model.UserData;
+import com.alipay.sofa.registry.client.api.registration.PublisherRegistration;
+import com.alipay.sofa.registry.client.api.registration.SubscriberRegistration;
+import com.alipay.sofa.registry.client.provider.DefaultRegistryClient;
+import com.alipay.sofa.registry.client.provider.DefaultRegistryClientConfigBuilder;
+import com.alipay.sofa.registry.core.model.ScopeEnum;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.dubbo.common.constants.CommonConstants.APPLICATION_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.DUBBO;
+import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
+import static org.apache.dubbo.registry.Constants.CONSUMER_PROTOCOL;
+import static org.apache.dubbo.registry.Constants.PROVIDER_PROTOCOL;
+import static org.apache.dubbo.registry.Constants.REGISTER_KEY;
+import static org.apache.dubbo.registry.Constants.SUBSCRIBE_KEY;
+import static org.apache.dubbo.registry.sofa.SofaRegistryConstants.ADDRESS_WAIT_TIME_KEY;
+import static org.apache.dubbo.registry.sofa.SofaRegistryConstants.DEFAULT_GROUP;
+import static org.apache.dubbo.registry.sofa.SofaRegistryConstants.LOCAL_DATA_CENTER;
+import static org.apache.dubbo.registry.sofa.SofaRegistryConstants.LOCAL_REGION;
+
+/**
+ * The Sofa registry.
+ *
+ * @since 2.7.2
+ */
+public class SofaRegistry extends FailbackRegistry {
+
+    /**
+     * Cache subscriber by dataId
+     */
+    private final Map<String, Subscriber> subscribers = new ConcurrentHashMap<>();
+
+    /**
+     * Direct registry client
+     */
+    private RegistryClient registryClient;
+    /**
+     * wait address from registry
+     */
+    private int waitAddressTimeout;
+
+    /**
+     * Instantiates a new Sofa registry.
+     *
+     * @param url the url
+     */
+    public SofaRegistry(URL url) {
+        super(url);
+        if (logger.isInfoEnabled()) {
+            logger.info("Build sofa registry by url:" + url);
+        }
+        this.registryClient = buildClient(url);
+        this.waitAddressTimeout = Integer.parseInt(System.getProperty(ADDRESS_WAIT_TIME_KEY, "5000"));
+    }
+
+    /**
+     * Build client registry client.
+     *
+     * @param url the url
+     * @return the registry client
+     */
+    protected RegistryClient buildClient(URL url) {
+        RegistryClientConfig config = DefaultRegistryClientConfigBuilder.start()
+                .setDataCenter(LOCAL_DATA_CENTER)
+                .setZone(LOCAL_REGION)
+                .setRegistryEndpoint(url.getHost())
+                .setRegistryEndpointPort(url.getPort()).build();
+
+        DefaultRegistryClient registryClient = new DefaultRegistryClient(config);
+        registryClient.init();
+        return registryClient;
+    }
+
+    @Override
+    public boolean isAvailable() {
+        return true;
+    }
+
+    @Override
+    public void doRegister(URL url) {
+        if (!url.getParameter(REGISTER_KEY, true)
+                || CONSUMER_PROTOCOL.equals(url.getProtocol())) {
+            return;
+        }
+
+        String serviceName = buildServiceName(url);
+        String serviceData = url.toFullString();
+
+        PublisherRegistration registration = new PublisherRegistration(serviceName);
+        addAttributesForPub(registration);
+
+        registryClient.register(registration, serviceData);
+    }
+
+    /**
+     * Add attributes for pub.
+     *
+     * @param publisherRegistration the publisher registration
+     */
+    protected void addAttributesForPub(PublisherRegistration publisherRegistration) {
+        publisherRegistration.setGroup(DEFAULT_GROUP);
+    }
+
+    @Override
+    public void doUnregister(URL url) {
+        if (!url.getParameter(REGISTER_KEY, true)
+                || CONSUMER_PROTOCOL.equals(url.getProtocol())) {
+            return;
+        }
+        String serviceName = buildServiceName(url);
+        registryClient.unregister(serviceName, DEFAULT_GROUP, RegistryType.PUBLISHER);
+    }
+
+    @Override
+    public void doSubscribe(URL url, final NotifyListener listener) {
+        if (!url.getParameter(SUBSCRIBE_KEY, true)
+                || PROVIDER_PROTOCOL.equals(url.getProtocol())) {
+            return;
+        }
+
+        String serviceName = buildServiceName(url);
+        // com.alipay.test.TestService:1.0:group@dubbo
+        Subscriber listSubscriber = subscribers.get(serviceName);
+
+        if (listSubscriber != null) {
+            logger.warn("Service name [" + serviceName + "] have bean registered in SOFARegistry.");
+
+            CountDownLatch countDownLatch = new CountDownLatch(1);
+            handleRegistryData(listSubscriber.peekData(), listener, countDownLatch);
+            waitAddress(serviceName, countDownLatch);
+            return;
+        }
+
+        final CountDownLatch latch = new CountDownLatch(1);
+        SubscriberRegistration subscriberRegistration = new SubscriberRegistration(serviceName,
+                (dataId, data) -> {
+                    //record change
+                    printAddressData(dataId, data);
+                    handleRegistryData(data, listener, latch);
+                });
+
+        addAttributesForSub(subscriberRegistration);
+        listSubscriber = registryClient.register(subscriberRegistration);
+
+        subscribers.put(serviceName, listSubscriber);
+
+        waitAddress(serviceName, latch);
+    }
+
+    private void waitAddress(String serviceName, CountDownLatch countDownLatch) {
+        try {
+            if (!countDownLatch.await(waitAddressTimeout, TimeUnit.MILLISECONDS)) {
+                logger.warn("Subscribe data failed by dataId " + serviceName);
+            }
+        } catch (Exception e) {
+            logger.error("Error when wait Address!", e);
+        }
+    }
+
+    @Override
+    public void doUnsubscribe(URL url, NotifyListener listener) {
+        if (!url.getParameter(SUBSCRIBE_KEY, true)
+                || PROVIDER_PROTOCOL.equals(url.getProtocol())) {
+            return;
+        }
+        String serviceName = buildServiceName(url);
+
+        registryClient.unregister(serviceName, DEFAULT_GROUP, RegistryType.SUBSCRIBER);
+    }
+
+    private void handleRegistryData(UserData data, NotifyListener notifyListener,
+                                    CountDownLatch latch) {
+        try {
+            List<URL> urls = new ArrayList<>();
+            if (null != data) {
+
+                List<String> datas = flatUserData(data);
+                for (String serviceUrl : datas) {
+                    URL url = URL.valueOf(serviceUrl);
+                    String serverApplication = url.getParameter(APPLICATION_KEY);
+                    if (StringUtils.isNotEmpty(serverApplication)) {
+                        url = url.addParameter("dstApp", serverApplication);
+                    }
+                    urls.add(url);
+                }
+            }
+            notifyListener.notify(urls);
+        } finally {
+            latch.countDown();
+        }
+    }
+
+    private String buildServiceName(URL url) {
+        // return url.getServiceKey();
+        StringBuilder buf = new StringBuilder();
+        buf.append(url.getServiceInterface());
+        String version = url.getParameter(VERSION_KEY);
+        if (StringUtils.isNotEmpty(version)) {
+            buf.append(":").append(version);
+        }
+        String group = url.getParameter(GROUP_KEY);
+        if (StringUtils.isNotEmpty(group)) {
+            buf.append(":").append(group);
+        }
+        buf.append("@").append(DUBBO);
+        return buf.toString();
+    }
+
+    /**
+     * Print address data.
+     *
+     * @param dataId   the data id
+     * @param userData the user data
+     */
+    protected void printAddressData(String dataId, UserData userData) {
+
+        List<String> datas;
+        if (userData == null) {
+            datas = new ArrayList<>(0);
+        } else {
+            datas = flatUserData(userData);
+        }
+
+        StringBuilder sb = new StringBuilder();
+        for (String provider : datas) {
+            sb.append("  >>> ").append(provider).append("\n");
+        }
+        if (logger.isInfoEnabled()) {
+            logger.info("Receive updated RPC service addresses: service[" + dataId
+                    + "]\n  .Available target addresses size [" + datas.size() + "]\n"
+                    + sb.toString());
+        }
+    }
+
+    /**
+     * Add attributes for sub.
+     *
+     * @param subscriberRegistration the subscriber registration
+     */
+    protected void addAttributesForSub(SubscriberRegistration subscriberRegistration) {
+        subscriberRegistration.setGroup(DEFAULT_GROUP);
+        subscriberRegistration.setScopeEnum(ScopeEnum.global);
+    }
+
+    /**
+     * Flat user data list.
+     *
+     * @param userData the user data
+     * @return the list
+     */
+    protected List<String> flatUserData(UserData userData) {
+        List<String> result = new ArrayList<>();
+        Map<String, List<String>> zoneData = userData.getZoneData();
+
+        for (Map.Entry<String, List<String>> entry : zoneData.entrySet()) {
+            result.addAll(entry.getValue());
+        }
+
+        return result;
+    }
+}
diff --git a/dubbo-registry-extensions/dubbo-registry-sofa/src/main/java/org/apache/dubbo/registry/sofa/SofaRegistryConstants.java b/dubbo-registry-extensions/dubbo-registry-sofa/src/main/java/org/apache/dubbo/registry/sofa/SofaRegistryConstants.java
new file mode 100644
index 0000000..f832e80
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-sofa/src/main/java/org/apache/dubbo/registry/sofa/SofaRegistryConstants.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.sofa;
+
+/**
+ * @since 2.7.2
+ */
+public class SofaRegistryConstants {
+
+    /**
+     * Default data center
+     */
+    public static final String LOCAL_DATA_CENTER = "DefaultDataCenter";
+
+    /**
+     * Default region
+     */
+    public static final String LOCAL_REGION = "DEFAULT_ZONE";
+
+    /**
+     * Default group
+     */
+    public static final String DEFAULT_GROUP = "SOFA";
+
+    /**
+     * parameter for address.wait.time of rpc reference
+     */
+    public static final String ADDRESS_WAIT_TIME_KEY = "rpc.reference.address.wait.time";
+}
diff --git a/dubbo-registry-extensions/dubbo-registry-sofa/src/main/java/org/apache/dubbo/registry/sofa/SofaRegistryFactory.java b/dubbo-registry-extensions/dubbo-registry-sofa/src/main/java/org/apache/dubbo/registry/sofa/SofaRegistryFactory.java
new file mode 100644
index 0000000..18a7809
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-sofa/src/main/java/org/apache/dubbo/registry/sofa/SofaRegistryFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.sofa;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.registry.Registry;
+import org.apache.dubbo.registry.support.AbstractRegistryFactory;
+
+/**
+ * @since 2.7.2
+ */
+public class SofaRegistryFactory extends AbstractRegistryFactory {
+
+    @Override
+    protected Registry createRegistry(URL url) {
+        initEnvironment(url);
+        return new SofaRegistry(url);
+    }
+
+    /**
+     * For extension, such as load zone/accessKey/secretKey/...
+     * 
+     * @param url URL
+     */
+    protected void initEnvironment(URL url) {
+    }
+}
diff --git a/dubbo-registry-extensions/dubbo-registry-sofa/src/main/java/org/apache/dubbo/registry/sofa/SofaRegistryInstance.java b/dubbo-registry-extensions/dubbo-registry-sofa/src/main/java/org/apache/dubbo/registry/sofa/SofaRegistryInstance.java
new file mode 100644
index 0000000..65befdb
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-sofa/src/main/java/org/apache/dubbo/registry/sofa/SofaRegistryInstance.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.sofa;
+
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class SofaRegistryInstance {
+    private String id;
+
+    private String host;
+
+    private int port;
+
+    private String name;
+
+    private Map<String, String> metadata = new HashMap<>();
+
+    private SofaRegistryInstance() {
+    }
+
+    public SofaRegistryInstance(String id, String host, int port, String name, Map<String, String> metadata) {
+        this.id = id;
+        this.host = host;
+        this.port = port;
+        this.name = name;
+        this.metadata = metadata;
+    }
+
+    public String getHost() {
+        return host;
+    }
+
+    public void setHost(String host) {
+        this.host = host;
+    }
+
+    public int getPort() {
+        return port;
+    }
+
+    public void setPort(int port) {
+        this.port = port;
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    public String getName() {
+        return this.name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public Map<String, String> getMetadata() {
+        return this.metadata;
+    }
+
+    public void setMetadata(Map<String, String> metadata) {
+        this.metadata = metadata;
+    }
+
+    @Override
+    public String toString() {
+        return "SofaRegistryInstance{" + "id='" + this.id + '\''+ "host='" + this.host + '\'' + "port='" + this.port + '\''+ ", name='" + this.name
+                + '\'' + ", metadata=" + this.metadata + '}';
+    }
+}
diff --git a/dubbo-registry-extensions/dubbo-registry-sofa/src/main/java/org/apache/dubbo/registry/sofa/SofaRegistryServiceDiscovery.java b/dubbo-registry-extensions/dubbo-registry-sofa/src/main/java/org/apache/dubbo/registry/sofa/SofaRegistryServiceDiscovery.java
new file mode 100644
index 0000000..6867685
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-sofa/src/main/java/org/apache/dubbo/registry/sofa/SofaRegistryServiceDiscovery.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.sofa;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.registry.client.AbstractServiceDiscovery;
+import org.apache.dubbo.registry.client.DefaultServiceInstance;
+import org.apache.dubbo.registry.client.ServiceInstance;
+import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent;
+import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener;
+import org.apache.dubbo.rpc.model.ApplicationModel;
+
+import com.alipay.sofa.registry.client.api.Publisher;
+import com.alipay.sofa.registry.client.api.RegistryClientConfig;
+import com.alipay.sofa.registry.client.api.Subscriber;
+import com.alipay.sofa.registry.client.api.model.RegistryType;
+import com.alipay.sofa.registry.client.api.model.UserData;
+import com.alipay.sofa.registry.client.api.registration.PublisherRegistration;
+import com.alipay.sofa.registry.client.api.registration.SubscriberRegistration;
+import com.alipay.sofa.registry.client.provider.DefaultRegistryClient;
+import com.alipay.sofa.registry.client.provider.DefaultRegistryClientConfigBuilder;
+import com.alipay.sofa.registry.core.model.ScopeEnum;
+import com.google.gson.Gson;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.dubbo.registry.sofa.SofaRegistryConstants.ADDRESS_WAIT_TIME_KEY;
+import static org.apache.dubbo.registry.sofa.SofaRegistryConstants.LOCAL_DATA_CENTER;
+import static org.apache.dubbo.registry.sofa.SofaRegistryConstants.LOCAL_REGION;
+
+
+public class SofaRegistryServiceDiscovery extends AbstractServiceDiscovery {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(SofaRegistryServiceDiscovery.class);
+
+    private static final String DEFAULT_GROUP = "dubbo";
+
+    private DefaultRegistryClient registryClient;
+
+    private int waitAddressTimeout;
+
+    private RegistryClientConfig registryClientConfig;
+
+    private final Map<String, Publisher> publishers = new ConcurrentHashMap<>();
+
+    private final Map<String, SofaRegistryListener> sofaRegistryListenerMap = new ConcurrentHashMap<>();
+
+
+    private Gson gson = new Gson();
+
+    public SofaRegistryServiceDiscovery(ApplicationModel applicationModel, URL registryURL) {
+        super(applicationModel, registryURL);
+
+        this.registryClientConfig = DefaultRegistryClientConfigBuilder.start()
+                .setDataCenter(LOCAL_DATA_CENTER)
+                .setZone(LOCAL_REGION)
+                .setRegistryEndpoint(registryURL.getHost())
+                .setRegistryEndpointPort(registryURL.getPort()).build();
+
+        registryClient = new DefaultRegistryClient(this.registryClientConfig);
+        registryClient.init();
+
+        this.waitAddressTimeout = Integer.parseInt(System.getProperty(ADDRESS_WAIT_TIME_KEY, "5000"));
+    }
+
+    @Override
+    public URL getUrl() {
+        return registryURL;
+    }
+
+    @Override
+    protected void doDestroy() throws Exception {
+
+    }
+
+    @Override
+    public void doRegister(ServiceInstance serviceInstance) {
+        SofaRegistryInstance sofaRegistryInstance = new SofaRegistryInstance(UUID.randomUUID().toString(), serviceInstance.getHost(), serviceInstance.getPort(), serviceInstance.getServiceName(), serviceInstance.getMetadata());
+        Publisher publisher = publishers.get(serviceInstance.getServiceName());
+        this.serviceInstance = serviceInstance;
+        if (null == publisher) {
+            PublisherRegistration registration = new PublisherRegistration(serviceInstance.getServiceName());
+            registration.setGroup(DEFAULT_GROUP);
+            publisher = registryClient.register(registration, gson.toJson(sofaRegistryInstance));
+
+            publishers.put(serviceInstance.getServiceName(), publisher);
+        } else {
+            publisher.republish(gson.toJson(sofaRegistryInstance));
+        }
+    }
+
+    @Override
+    protected void doUnregister(ServiceInstance serviceInstance) {
+        registryClient.unregister(serviceInstance.getServiceName(), DEFAULT_GROUP, RegistryType.PUBLISHER);
+    }
+
+    @Override
+    public synchronized void addServiceInstancesChangedListener(ServiceInstancesChangedListener listener) throws NullPointerException, IllegalArgumentException {
+        listener.getServiceNames().forEach(serviceName -> registerServiceWatcher(serviceName, listener));
+    }
+
+    protected void registerServiceWatcher(String serviceName, ServiceInstancesChangedListener listener) {
+        SofaRegistryListener sofaRegistryListener = sofaRegistryListenerMap.get(serviceName);
+
+        if (null == sofaRegistryListener) {
+            sofaRegistryListener = new SofaRegistryListener(serviceName);
+            sofaRegistryListenerMap.put(serviceName, sofaRegistryListener);
+            sofaRegistryListener.addListener(listener);
+            sofaRegistryListener.start();
+        } else {
+            sofaRegistryListener.addListener(listener);
+        }
+
+    }
+
+    @Override
+    public synchronized List<ServiceInstance> getInstances(String serviceName) throws NullPointerException {
+        SofaRegistryListener sofaRegistryListener = sofaRegistryListenerMap.get(serviceName);
+
+        if (null == sofaRegistryListener) {
+            sofaRegistryListener = new SofaRegistryListener(serviceName);
+            sofaRegistryListenerMap.put(serviceName, sofaRegistryListener);
+            sofaRegistryListener.start();
+        }
+
+        return sofaRegistryListener.peekData();
+    }
+
+    @Override
+    public synchronized void removeServiceInstancesChangedListener(ServiceInstancesChangedListener listener) throws IllegalArgumentException {
+        for (String serviceName : listener.getServiceNames()) {
+            SofaRegistryListener sofaRegistryListener = sofaRegistryListenerMap.get(serviceName);
+
+            if (null != sofaRegistryListener) {
+                sofaRegistryListener.removeListener(listener);
+
+                if (sofaRegistryListener.getListenerCount() == 0) {
+                    sofaRegistryListener.stop();
+                    sofaRegistryListenerMap.remove(serviceName);
+                }
+            }
+        }
+    }
+
+    public class SofaRegistryListener {
+        private final String serviceName;
+        private volatile Subscriber subscriber;
+        private final List<ServiceInstancesChangedListener> listeners = new CopyOnWriteArrayList<>();
+
+        private volatile List<ServiceInstance> serviceInstances;
+
+        public SofaRegistryListener(String serviceName) {
+            this.serviceName = serviceName;
+        }
+
+        public void start() {
+            final CountDownLatch latch = new CountDownLatch(1);
+            SubscriberRegistration subscriberRegistration = new SubscriberRegistration(serviceName, (dataId, data) -> {
+                handleRegistryData(dataId, data, latch);
+            });
+            subscriberRegistration.setGroup(DEFAULT_GROUP);
+            subscriberRegistration.setScopeEnum(ScopeEnum.global);
+
+            subscriber = registryClient.register(subscriberRegistration);
+
+            waitAddress(serviceName, latch);
+        }
+
+        public void stop() {
+            if (null != subscriber) {
+                subscriber.unregister();
+            }
+        }
+
+        private void handleRegistryData(String dataId, UserData userData, CountDownLatch latch) {
+            try {
+                List<String> datas = getUserData(dataId, userData);
+                List<ServiceInstance> newServiceInstances = new ArrayList<>(datas.size());
+
+                for (String serviceData : datas) {
+                    SofaRegistryInstance sri = gson.fromJson(serviceData, SofaRegistryInstance.class);
+
+                    DefaultServiceInstance serviceInstance = new DefaultServiceInstance(dataId, sri.getHost(), sri.getPort(), applicationModel);
+                    serviceInstance.setMetadata(sri.getMetadata());
+                    newServiceInstances.add(serviceInstance);
+                }
+
+                this.serviceInstances = newServiceInstances;
+
+                for (ServiceInstancesChangedListener listener : listeners) {
+                    listener.onEvent(new ServiceInstancesChangedEvent(dataId, serviceInstances));
+                }
+            } finally {
+                if (null != latch) {
+                    latch.countDown();
+                }
+            }
+        }
+
+        public List<ServiceInstance> peekData() {
+            return serviceInstances;
+        }
+
+        public void addListener(ServiceInstancesChangedListener listener) {
+            listeners.add(listener);
+        }
+
+        public void removeListener(ServiceInstancesChangedListener listener) {
+            listeners.remove(listener);
+        }
+
+        public int getListenerCount() {
+            return listeners.size();
+        }
+    }
+
+    private void waitAddress(String serviceName, CountDownLatch countDownLatch) {
+        try {
+            if (!countDownLatch.await(waitAddressTimeout, TimeUnit.MILLISECONDS)) {
+                LOGGER.warn("Subscribe data failed by dataId " + serviceName);
+            }
+        } catch (Exception e) {
+            LOGGER.error("Error when wait Address!", e);
+        }
+    }
+
+    /**
+     * Print address data.
+     *
+     * @param dataId   the data id
+     * @param userData the user data
+     */
+    protected List<String> getUserData(String dataId, UserData userData) {
+
+        List<String> datas = null;
+        if (userData == null) {
+            datas = new ArrayList<>(0);
+        } else {
+            datas = flatUserData(userData);
+        }
+
+        StringBuilder sb = new StringBuilder();
+        for (String provider : datas) {
+            sb.append("  >>> ").append(provider).append("\n");
+        }
+        if (LOGGER.isInfoEnabled()) {
+            LOGGER.info("Receive updated RPC service addresses: service[" + dataId
+                    + "]\n  .Available target addresses size [" + datas.size() + "]\n"
+                    + sb.toString());
+        }
+
+        return datas;
+    }
+
+    /**
+     * Flat user data list.
+     *
+     * @param userData the user data
+     * @return the list
+     */
+    protected List<String> flatUserData(UserData userData) {
+        List<String> result = new ArrayList<>();
+        Map<String, List<String>> zoneData = userData.getZoneData();
+
+        for (Map.Entry<String, List<String>> entry : zoneData.entrySet()) {
+            result.addAll(entry.getValue());
+        }
+
+        return result;
+    }
+
+    @Override
+    public Set<String> getServices() {
+        return sofaRegistryListenerMap.keySet();
+    }
+
+}
diff --git a/dubbo-registry-extensions/dubbo-registry-sofa/src/main/java/org/apache/dubbo/registry/sofa/SofaRegistryServiceDiscoveryFactory.java b/dubbo-registry-extensions/dubbo-registry-sofa/src/main/java/org/apache/dubbo/registry/sofa/SofaRegistryServiceDiscoveryFactory.java
new file mode 100644
index 0000000..b3b03e5
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-sofa/src/main/java/org/apache/dubbo/registry/sofa/SofaRegistryServiceDiscoveryFactory.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.sofa;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.registry.client.AbstractServiceDiscoveryFactory;
+import org.apache.dubbo.registry.client.ServiceDiscovery;
+
+public class SofaRegistryServiceDiscoveryFactory extends AbstractServiceDiscoveryFactory {
+    @Override
+    protected ServiceDiscovery createDiscovery(URL registryURL) {
+        return new SofaRegistryServiceDiscovery(applicationModel, registryURL);
+    }
+}
diff --git a/dubbo-registry-extensions/dubbo-registry-sofa/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.RegistryFactory b/dubbo-registry-extensions/dubbo-registry-sofa/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.RegistryFactory
new file mode 100644
index 0000000..dff4cbb
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-sofa/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.RegistryFactory
@@ -0,0 +1 @@
+sofa=org.apache.dubbo.registry.sofa.SofaRegistryFactory
\ No newline at end of file
diff --git a/dubbo-registry-extensions/dubbo-registry-sofa/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.ServiceDiscovery b/dubbo-registry-extensions/dubbo-registry-sofa/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.ServiceDiscovery
new file mode 100644
index 0000000..4578cbd
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-sofa/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.ServiceDiscovery
@@ -0,0 +1 @@
+sofa=org.apache.dubbo.registry.sofa.SofaRegistryServiceDiscovery
\ No newline at end of file
diff --git a/dubbo-registry-extensions/dubbo-registry-sofa/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.ServiceDiscoveryFactory b/dubbo-registry-extensions/dubbo-registry-sofa/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.ServiceDiscoveryFactory
new file mode 100644
index 0000000..d16d75c
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-sofa/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.ServiceDiscoveryFactory
@@ -0,0 +1 @@
+sofa=org.apache.dubbo.registry.sofa.SofaRegistryServiceDiscoveryFactory
\ No newline at end of file
diff --git a/dubbo-registry-extensions/dubbo-registry-sofa/src/test/java/org/apache/dubbo/registry/sofa/HelloService.java b/dubbo-registry-extensions/dubbo-registry-sofa/src/test/java/org/apache/dubbo/registry/sofa/HelloService.java
new file mode 100644
index 0000000..cc66a5e
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-sofa/src/test/java/org/apache/dubbo/registry/sofa/HelloService.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.sofa;
+
+/**
+ */
+public interface HelloService {
+
+    String sayHello(String name);
+}
diff --git a/dubbo-registry-extensions/dubbo-registry-sofa/src/test/java/org/apache/dubbo/registry/sofa/HelloServiceImpl.java b/dubbo-registry-extensions/dubbo-registry-sofa/src/test/java/org/apache/dubbo/registry/sofa/HelloServiceImpl.java
new file mode 100644
index 0000000..a9fb0ca
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-sofa/src/test/java/org/apache/dubbo/registry/sofa/HelloServiceImpl.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.sofa;
+
+import com.alipay.sofa.registry.server.test.TestRegistryMain;
+
+/**
+ */
+public class HelloServiceImpl implements HelloService {
+
+    private final String result;
+
+    public HelloServiceImpl(String result) {
+        this.result = result;
+    }
+
+    @Override
+    public String sayHello(String name) {
+        return result != null ? result : "hello, " + name + "!";
+    }
+
+    public static void main(String[] args) {
+        TestRegistryMain registryMain = new TestRegistryMain();
+        try {
+            registryMain.startRegistry();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+}
diff --git a/dubbo-registry-extensions/dubbo-registry-sofa/src/test/resources/log4j.properties b/dubbo-registry-extensions/dubbo-registry-sofa/src/test/resources/log4j.properties
new file mode 100644
index 0000000..8de4c4f
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-sofa/src/test/resources/log4j.properties
@@ -0,0 +1,7 @@
+###set log levels###
+log4j.rootLogger=info, stdout
+###output to the console###
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.Target=System.out
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=[%d{dd/MM/yy HH:mm:ss:SSS z}] %t %5p %c{2}: %m%n
diff --git a/dubbo-registry-extensions/pom.xml b/dubbo-registry-extensions/pom.xml
index 78ba1d7..6d8b87f 100644
--- a/dubbo-registry-extensions/pom.xml
+++ b/dubbo-registry-extensions/pom.xml
@@ -33,5 +33,9 @@
         <module>dubbo-registry-kubernetes</module>
         <module>dubbo-registry-dns</module>
         <module>dubbo-registry-xds</module>
+        <module>dubbo-registry-consul</module>
+        <module>dubbo-registry-etcd3</module>
+        <module>dubbo-registry-redis</module>
+        <module>dubbo-registry-sofa</module>
     </modules>
 </project>