You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by al...@apache.org on 2022/07/21 09:28:39 UTC
[dubbo-spi-extensions] branch master updated: Enhance registry module (#134)
This is an automated email from the ASF dual-hosted git repository.
albumenj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo-spi-extensions.git
The following commit(s) were added to refs/heads/master by this push:
new 04fc377 Enhance registry module (#134)
04fc377 is described below
commit 04fc3772f807000e20a955048270bac7794dd7d1
Author: Albumen Kevin <jh...@gmail.com>
AuthorDate: Thu Jul 21 17:28:35 2022 +0800
Enhance registry module (#134)
* add consul registry
* add etcd3 registry
* add eureka registry
* add redis registry
* add sofa registry
* opt pom
* try adapt consul
* fix consul
* try adapt etcd
* try tp adapt sofa
* remove eureka
* fix consul
* fix compile
* fix checkstyle
---
dubbo-extensions-dependencies-bom/pom.xml | 56 +++
.../dubbo-registry-consul/pom.xml | 63 +++
.../registry/consul/AbstractConsulRegistry.java | 35 ++
.../dubbo/registry/consul/ConsulConstants.java | 34 ++
.../dubbo/registry/consul/ConsulParameter.java | 87 ++++
.../dubbo/registry/consul/ConsulRegistry.java | 384 +++++++++++++++
.../registry/consul/ConsulRegistryFactory.java | 32 ++
.../registry/consul/ConsulServiceDiscovery.java | 469 +++++++++++++++++++
.../consul/ConsulServiceDiscoveryFactory.java | 30 ++
.../org.apache.dubbo.registry.RegistryFactory | 1 +
...e.dubbo.registry.client.ServiceDiscoveryFactory | 1 +
.../dubbo/registry/consul/ConsulRegistryTest.java | 151 ++++++
.../consul/ConsulServiceDiscoveryTest.java | 149 ++++++
.../dubbo-registry-etcd3/pom.xml | 51 ++
.../apache/dubbo/registry/etcd/EtcdRegistry.java | 365 +++++++++++++++
.../dubbo/registry/etcd/EtcdRegistryFactory.java | 36 ++
.../dubbo/registry/etcd/EtcdServiceDiscovery.java | 240 ++++++++++
.../registry/etcd/EtcdServiceDiscoveryFactory.java | 30 ++
.../org.apache.dubbo.registry.RegistryFactory | 1 +
...e.dubbo.registry.client.ServiceDiscoveryFactory | 1 +
.../dubbo/registry/etcd/EtcdRegistryTest.java | 329 +++++++++++++
.../registry/etcd/EtcdServiceDiscoveryTest.java | 124 +++++
.../dubbo-registry-redis/pom.xml | 59 +++
.../apache/dubbo/registry/redis/RedisRegistry.java | 519 +++++++++++++++++++++
.../dubbo/registry/redis/RedisRegistryFactory.java | 34 ++
.../org.apache.dubbo.registry.RegistryFactory | 1 +
.../dubbo/registry/redis/RedisRegistryTest.java | 250 ++++++++++
.../dubbo-registry-sofa/pom.xml | 130 ++++++
.../apache/dubbo/registry/sofa/SofaRegistry.java | 295 ++++++++++++
.../dubbo/registry/sofa/SofaRegistryConstants.java | 43 ++
.../dubbo/registry/sofa/SofaRegistryFactory.java | 41 ++
.../dubbo/registry/sofa/SofaRegistryInstance.java | 90 ++++
.../sofa/SofaRegistryServiceDiscovery.java | 301 ++++++++++++
.../sofa/SofaRegistryServiceDiscoveryFactory.java | 28 ++
.../org.apache.dubbo.registry.RegistryFactory | 1 +
...g.apache.dubbo.registry.client.ServiceDiscovery | 1 +
...e.dubbo.registry.client.ServiceDiscoveryFactory | 1 +
.../apache/dubbo/registry/sofa/HelloService.java | 24 +
.../dubbo/registry/sofa/HelloServiceImpl.java | 44 ++
.../src/test/resources/log4j.properties | 7 +
dubbo-registry-extensions/pom.xml | 4 +
41 files changed, 4542 insertions(+)
diff --git a/dubbo-extensions-dependencies-bom/pom.xml b/dubbo-extensions-dependencies-bom/pom.xml
index 41a1230..346535a 100644
--- a/dubbo-extensions-dependencies-bom/pom.xml
+++ b/dubbo-extensions-dependencies-bom/pom.xml
@@ -128,6 +128,11 @@
<consul_client_version>1.3.7</consul_client_version>
<test_container_version>1.15.2</test_container_version>
<seata_version>1.5.2</seata_version>
+ <eureka.version>1.9.12</eureka.version>
+ <sofa_registry_version>5.2.0</sofa_registry_version>
+ <logback_version>1.2.11</logback_version>
+ <rs_api_version>2.0</rs_api_version>
+ <resteasy_version>3.0.19.Final</resteasy_version>
<maven_flatten_version>1.2.5</maven_flatten_version>
</properties>
@@ -427,6 +432,57 @@
<artifactId>seata-core</artifactId>
<version>${seata_version}</version>
</dependency>
+ <dependency>
+ <groupId>com.netflix.eureka</groupId>
+ <artifactId>eureka-client</artifactId>
+ <version>${eureka.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.netflix.eureka</groupId>
+ <artifactId>eureka-core</artifactId>
+ <version>${eureka.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.alipay.sofa</groupId>
+ <artifactId>registry-client-all</artifactId>
+ <version>${sofa_registry_version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.alipay.sofa</groupId>
+ <artifactId>registry-test</artifactId>
+ <version>${sofa_registry_version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ <version>${logback_version}</version>
+ </dependency>
+ <dependency>
+ <groupId>javax.ws.rs</groupId>
+ <artifactId>javax.ws.rs-api</artifactId>
+ <version>${rs_api_version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.resteasy</groupId>
+ <artifactId>resteasy-jaxrs</artifactId>
+ <version>${resteasy_version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.resteasy</groupId>
+ <artifactId>resteasy-client</artifactId>
+ <version>${resteasy_version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.resteasy</groupId>
+ <artifactId>resteasy-netty4</artifactId>
+ <version>${resteasy_version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-all</artifactId>
+ </exclusion>
+ </exclusions>
</dependencies>
</dependencyManagement>
diff --git a/dubbo-registry-extensions/dubbo-registry-consul/pom.xml b/dubbo-registry-extensions/dubbo-registry-consul/pom.xml
new file mode 100644
index 0000000..8674ed4
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-consul/pom.xml
@@ -0,0 +1,63 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>dubbo-registry-extensions</artifactId>
+ <groupId>org.apache.dubbo.extensions</groupId>
+ <version>${revision}</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <version>1.0.0-SNAPSHOT</version>
+ <artifactId>dubbo-registry-consul</artifactId>
+
+ <properties>
+ <skipIntegrationTests>true</skipIntegrationTests>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo-registry-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.ecwid.consul</groupId>
+ <artifactId>consul-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.pszymczyk.consul</groupId>
+ <artifactId>embedded-consul</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <skipTests>${skipIntegrationTests}</skipTests>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git a/dubbo-registry-extensions/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/AbstractConsulRegistry.java b/dubbo-registry-extensions/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/AbstractConsulRegistry.java
new file mode 100644
index 0000000..f18bc45
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/AbstractConsulRegistry.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.consul;
+
+/**
+ * @author cvictory ON 2019-08-02
+ */
+public class AbstractConsulRegistry {
+
+ static final String SERVICE_TAG = "dubbo";
+ static final String URL_META_KEY = "url";
+ static final String CHECK_PASS_INTERVAL = "consul-check-pass-interval";
+ static final String DEREGISTER_AFTER = "consul-deregister-critical-service-after";
+
+ static final long DEFAULT_CHECK_PASS_INTERVAL = 16000L;
+ // default deregister critical server after
+ static final String DEFAULT_DEREGISTER_TIME = "20s";
+
+ static final int PERIOD_DENOMINATOR = 8;
+ static final int ONE_THOUSAND = 1000;
+}
diff --git a/dubbo-registry-extensions/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulConstants.java b/dubbo-registry-extensions/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulConstants.java
new file mode 100644
index 0000000..1976f30
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulConstants.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.registry.consul;
+
+/**
+ * Common configuration for configCenter, metadata, and registry modules
+ */
+public interface ConsulConstants {
+
+ int DEFAULT_PORT = 8500;
+
+ int DEFAULT_WATCH_TIMEOUT = 60 * 1000;
+
+ String WATCH_TIMEOUT = "consul-watch-timeout";
+
+ int INVALID_PORT = 0;
+
+ String TOKEN = "token";
+}
diff --git a/dubbo-registry-extensions/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulParameter.java b/dubbo-registry-extensions/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulParameter.java
new file mode 100644
index 0000000..f3fb024
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulParameter.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.consul;
+
+import org.apache.dubbo.common.URL;
+
+import static org.apache.dubbo.common.utils.StringUtils.isBlank;
+
+/**
+ * The enumeration for the Consul's parameters on the {@link URL}
+ *
+ * @see URL#getParameters()
+ * @since 2.7.8
+ */
+public enum ConsulParameter {
+
+ ACL_TOKEN,
+
+ TAGS,
+
+ INSTANCE_ZONE,
+
+ DEFAULT_ZONE_METADATA_NAME("zone"),
+
+ INSTANCE_GROUP,
+
+ CONSISTENCY_MODE,
+
+ ;
+
+ private final String name;
+
+ private final String defaultValue;
+
+ ConsulParameter() {
+ this(null);
+ }
+
+ ConsulParameter(String defaultValue) {
+ this(null, defaultValue);
+ }
+
+ ConsulParameter(String name, String defaultValue) {
+ this.name = isBlank(name) ? defaultName() : name;
+ this.defaultValue = defaultValue;
+ }
+
+ private String defaultName() {
+ return name().toLowerCase().replace('_', '-');
+ }
+
+ /**
+ * The parameter value from the specified registry {@link URL}
+ *
+ * @param registryURL the specified registry {@link URL}
+ * @return <code>defaultValue</code> if not found
+ */
+ public String getValue(URL registryURL) {
+ return registryURL.getParameter(name, defaultValue);
+ }
+
+ /**
+ * The parameter value from the specified registry {@link URL}
+ *
+ * @param registryURL the specified registry {@link URL}
+ * @param valueType the type of parameter value
+ * @param defaultValue the default value if parameter is absent
+ * @return <code>defaultValue</code> if not found
+ */
+ public <T> T getValue(URL registryURL, Class<T> valueType, T defaultValue) {
+ return registryURL.getParameter(name, valueType, defaultValue);
+ }
+}
diff --git a/dubbo-registry-extensions/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulRegistry.java b/dubbo-registry-extensions/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulRegistry.java
new file mode 100644
index 0000000..2338907
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulRegistry.java
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.registry.consul;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.URLBuilder;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.CollectionUtils;
+import org.apache.dubbo.common.utils.NamedThreadFactory;
+import org.apache.dubbo.common.utils.UrlUtils;
+import org.apache.dubbo.registry.NotifyListener;
+import org.apache.dubbo.registry.support.FailbackRegistry;
+import org.apache.dubbo.rpc.RpcException;
+
+import com.ecwid.consul.v1.ConsulClient;
+import com.ecwid.consul.v1.QueryParams;
+import com.ecwid.consul.v1.Response;
+import com.ecwid.consul.v1.agent.model.NewService;
+import com.ecwid.consul.v1.catalog.CatalogServicesRequest;
+import com.ecwid.consul.v1.health.HealthServicesRequest;
+import com.ecwid.consul.v1.health.model.HealthService;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static java.util.concurrent.Executors.newCachedThreadPool;
+import static org.apache.dubbo.common.constants.CommonConstants.ANY_VALUE;
+import static org.apache.dubbo.common.constants.RegistryConstants.CATEGORY_KEY;
+import static org.apache.dubbo.common.constants.RegistryConstants.EMPTY_PROTOCOL;
+import static org.apache.dubbo.registry.Constants.CONSUMER_PROTOCOL;
+import static org.apache.dubbo.registry.Constants.PROVIDER_PROTOCOL;
+import static org.apache.dubbo.registry.consul.AbstractConsulRegistry.CHECK_PASS_INTERVAL;
+import static org.apache.dubbo.registry.consul.AbstractConsulRegistry.DEFAULT_CHECK_PASS_INTERVAL;
+import static org.apache.dubbo.registry.consul.AbstractConsulRegistry.DEFAULT_DEREGISTER_TIME;
+import static org.apache.dubbo.registry.consul.AbstractConsulRegistry.DEREGISTER_AFTER;
+import static org.apache.dubbo.registry.consul.AbstractConsulRegistry.ONE_THOUSAND;
+import static org.apache.dubbo.registry.consul.AbstractConsulRegistry.PERIOD_DENOMINATOR;
+import static org.apache.dubbo.registry.consul.AbstractConsulRegistry.SERVICE_TAG;
+import static org.apache.dubbo.registry.consul.AbstractConsulRegistry.URL_META_KEY;
+import static org.apache.dubbo.rpc.Constants.TOKEN_KEY;
+
+/**
+ * registry center implementation for consul
+ */
+public class ConsulRegistry extends FailbackRegistry {
+ private static final Logger logger = LoggerFactory.getLogger(ConsulRegistry.class);
+
+ private ConsulClient client;
+ private long checkPassInterval;
+ private ExecutorService notifierExecutor = newCachedThreadPool(
+ new NamedThreadFactory("dubbo-consul-notifier", true));
+ private ConcurrentMap<URL, ConsulNotifier> notifiers = new ConcurrentHashMap<>();
+ private ScheduledExecutorService ttlConsulCheckExecutor;
+ /**
+ * The ACL token
+ */
+ private String token;
+
+ private static final int CONSUL_CORE_THREAD_SIZE = 1;
+
+ private static final int DEFAULT_INDEX = -1;
+ private static final int DEFAULT_WAIT_TIME = -1;
+
+
+ public ConsulRegistry(URL url) {
+ super(url);
+ token = url.getParameter(TOKEN_KEY, (String) null);
+ String host = url.getHost();
+ int port = ConsulConstants.INVALID_PORT != url.getPort() ? url.getPort() : ConsulConstants.DEFAULT_PORT;
+ client = new ConsulClient(host, port);
+ checkPassInterval = url.getParameter(CHECK_PASS_INTERVAL, DEFAULT_CHECK_PASS_INTERVAL);
+ ttlConsulCheckExecutor = new ScheduledThreadPoolExecutor(CONSUL_CORE_THREAD_SIZE, new NamedThreadFactory("Ttl-Consul-Check-Executor", true));
+ ttlConsulCheckExecutor.scheduleAtFixedRate(this::checkPass, checkPassInterval / PERIOD_DENOMINATOR,
+ checkPassInterval / PERIOD_DENOMINATOR, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void register(URL url) {
+ if (isConsumerSide(url)) {
+ return;
+ }
+
+ super.register(url);
+ }
+
+ @Override
+ public void doRegister(URL url) {
+ if (token == null) {
+ client.agentServiceRegister(buildService(url));
+ } else {
+ client.agentServiceRegister(buildService(url), token);
+ }
+ }
+
+ @Override
+ public void unregister(URL url) {
+ if (isConsumerSide(url)) {
+ return;
+ }
+
+ super.unregister(url);
+ }
+
+ @Override
+ public void doUnregister(URL url) {
+ if (token == null) {
+ client.agentServiceDeregister(buildId(url));
+ } else {
+ client.agentServiceDeregister(buildId(url), token);
+ }
+ }
+
+ @Override
+ public void subscribe(URL url, NotifyListener listener) {
+ if (isProviderSide(url)) {
+ return;
+ }
+
+ super.subscribe(url, listener);
+ }
+
+ @Override
+ public void doSubscribe(URL url, NotifyListener listener) {
+ Long index;
+ List<URL> urls;
+ if (ANY_VALUE.equals(url.getServiceInterface())) {
+ Response<Map<String, List<String>>> response = getAllServices(DEFAULT_INDEX, buildWatchTimeout(url));
+ index = response.getConsulIndex();
+ List<HealthService> services = getHealthServices(response.getValue());
+ urls = convert(services, url);
+ } else {
+ String service = url.getServiceInterface();
+ Response<List<HealthService>> response = getHealthServices(service, DEFAULT_INDEX, buildWatchTimeout(url));
+ index = response.getConsulIndex();
+ urls = convert(response.getValue(), url);
+ }
+
+ notify(url, listener, urls);
+ ConsulNotifier notifier = notifiers.computeIfAbsent(url, k -> new ConsulNotifier(url, index));
+ notifierExecutor.submit(notifier);
+ }
+
+ @Override
+ public void unsubscribe(URL url, NotifyListener listener) {
+ if (isProviderSide(url)) {
+ return;
+ }
+
+ super.unsubscribe(url, listener);
+ }
+
+ @Override
+ public void doUnsubscribe(URL url, NotifyListener listener) {
+ ConsulNotifier notifier = notifiers.remove(url);
+ notifier.stop();
+ }
+
+ @Override
+ public List<URL> lookup(URL url) {
+ if (url == null) {
+ throw new IllegalArgumentException("lookup url == null");
+ }
+ try {
+ String service = url.getServiceKey();
+ Response<List<HealthService>> result = getHealthServices(service, DEFAULT_INDEX, buildWatchTimeout(url));
+ if (result == null || result.getValue() == null || result.getValue().isEmpty()) {
+ return new ArrayList<>();
+ } else {
+ return convert(result.getValue(), url);
+ }
+ } catch (Throwable e) {
+ throw new RpcException("Failed to lookup " + url + " from consul " + getUrl() + ", cause: " + e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public boolean isAvailable() {
+ return client.getAgentSelf() != null;
+ }
+
+ @Override
+ public void destroy() {
+ super.destroy();
+ notifierExecutor.shutdown();
+ ttlConsulCheckExecutor.shutdown();
+ }
+
+ private void checkPass() {
+ for (URL url : getRegistered()) {
+ String checkId = buildId(url);
+ try {
+ if (token == null) {
+ client.agentCheckPass("service:" + checkId);
+ } else {
+ client.agentCheckPass("service:" + checkId, null, token);
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("check pass for url: " + url + " with check id: " + checkId);
+ }
+ } catch (Throwable t) {
+ logger.warn("fail to check pass for url: " + url + ", check id is: " + checkId, t);
+ }
+ }
+ }
+
+ private Response<List<HealthService>> getHealthServices(String service, long index, int watchTimeout) {
+ HealthServicesRequest request = HealthServicesRequest.newBuilder()
+ .setTag(SERVICE_TAG)
+ .setQueryParams(new QueryParams(watchTimeout, index))
+ .setPassing(true)
+ .setToken(token)
+ .build();
+ return client.getHealthServices(service, request);
+ }
+
+ private Response<Map<String, List<String>>> getAllServices(long index, int watchTimeout) {
+ CatalogServicesRequest request = CatalogServicesRequest.newBuilder()
+ .setQueryParams(new QueryParams(watchTimeout, index))
+ .setToken(token)
+ .build();
+ return client.getCatalogServices(request);
+ }
+
+ private List<HealthService> getHealthServices(Map<String, List<String>> services) {
+ return services.entrySet().stream()
+ .filter(s -> s.getValue().contains(SERVICE_TAG))
+ .map(s -> getHealthServices(s.getKey(), DEFAULT_INDEX, DEFAULT_WAIT_TIME).getValue())
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList());
+ }
+
+
+ private boolean isConsumerSide(URL url) {
+ return url.getProtocol().equals(CONSUMER_PROTOCOL);
+ }
+
+ private boolean isProviderSide(URL url) {
+ return url.getProtocol().equals(PROVIDER_PROTOCOL);
+ }
+
+ private List<URL> convert(List<HealthService> services, URL consumerURL) {
+ if (CollectionUtils.isEmpty(services)) {
+ return emptyURL(consumerURL);
+ }
+ return services.stream()
+ .map(HealthService::getService)
+ .filter(Objects::nonNull)
+ .map(HealthService.Service::getMeta)
+ .filter(m -> m != null && m.containsKey(URL_META_KEY))
+ .map(m -> m.get(URL_META_KEY))
+ .map(URL::valueOf)
+ .filter(url -> UrlUtils.isMatch(consumerURL, url))
+ .collect(Collectors.toList());
+ }
+
+ private List<URL> emptyURL(URL consumerURL) {
+ // No Category Parameter
+ URL empty = URLBuilder.from(consumerURL)
+ .setProtocol(EMPTY_PROTOCOL)
+ .removeParameter(CATEGORY_KEY)
+ .build();
+ List<URL> result = new ArrayList<URL>();
+ result.add(empty);
+ return result;
+ }
+
+ private NewService buildService(URL url) {
+ NewService service = new NewService();
+ service.setAddress(url.getHost());
+ service.setPort(url.getPort());
+ service.setId(buildId(url));
+ service.setName(url.getServiceInterface());
+ service.setCheck(buildCheck(url));
+ service.setTags(buildTags(url));
+ service.setMeta(Collections.singletonMap(URL_META_KEY, url.toFullString()));
+ return service;
+ }
+
+ private List<String> buildTags(URL url) {
+ Map<String, String> params = url.getParameters();
+ List<String> tags = params.entrySet().stream()
+ .map(k -> k.getKey() + "=" + k.getValue())
+ .collect(Collectors.toList());
+ tags.add(SERVICE_TAG);
+ return tags;
+ }
+
+ private String buildId(URL url) {
+ // let's simply use url's hashcode to generate unique service id for now
+ return Integer.toHexString(url.hashCode());
+ }
+
+ private NewService.Check buildCheck(URL url) {
+ NewService.Check check = new NewService.Check();
+ check.setTtl((checkPassInterval / ONE_THOUSAND) + "s");
+ check.setDeregisterCriticalServiceAfter(url.getParameter(DEREGISTER_AFTER, DEFAULT_DEREGISTER_TIME));
+ return check;
+ }
+
+ private int buildWatchTimeout(URL url) {
+ return url.getParameter(ConsulConstants.WATCH_TIMEOUT, ConsulConstants.DEFAULT_WATCH_TIMEOUT) / ONE_THOUSAND;
+ }
+
+ private class ConsulNotifier implements Runnable {
+ private URL url;
+ private long consulIndex;
+ private boolean running;
+
+ ConsulNotifier(URL url, long consulIndex) {
+ this.url = url;
+ this.consulIndex = consulIndex;
+ this.running = true;
+ }
+
+ @Override
+ public void run() {
+ while (this.running) {
+ if (ANY_VALUE.equals(url.getServiceInterface())) {
+ processServices();
+ } else {
+ processService();
+ }
+ }
+ }
+
+ private void processService() {
+ String service = url.getServiceKey();
+ Response<List<HealthService>> response = getHealthServices(service, consulIndex, buildWatchTimeout(url));
+ Long currentIndex = response.getConsulIndex();
+ if (currentIndex != null && currentIndex > consulIndex) {
+ consulIndex = currentIndex;
+ List<HealthService> services = response.getValue();
+ List<URL> urls = convert(services, url);
+ for (NotifyListener listener : getSubscribed().get(url)) {
+ doNotify(url, listener, urls);
+ }
+ }
+ }
+
+ private void processServices() {
+ Response<Map<String, List<String>>> response = getAllServices(consulIndex, buildWatchTimeout(url));
+ Long currentIndex = response.getConsulIndex();
+ if (currentIndex != null && currentIndex > consulIndex) {
+ consulIndex = currentIndex;
+ List<HealthService> services = getHealthServices(response.getValue());
+ List<URL> urls = convert(services, url);
+ for (NotifyListener listener : getSubscribed().get(url)) {
+ doNotify(url, listener, urls);
+ }
+ }
+ }
+
+ void stop() {
+ this.running = false;
+ }
+ }
+}
diff --git a/dubbo-registry-extensions/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulRegistryFactory.java b/dubbo-registry-extensions/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulRegistryFactory.java
new file mode 100644
index 0000000..c36f009
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulRegistryFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.registry.consul;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.registry.Registry;
+import org.apache.dubbo.registry.support.AbstractRegistryFactory;
+
+/**
+ * registry center factory implementation for consul
+ */
+public class ConsulRegistryFactory extends AbstractRegistryFactory {
+ @Override
+ protected Registry createRegistry(URL url) {
+ return new ConsulRegistry(url);
+ }
+}
diff --git a/dubbo-registry-extensions/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulServiceDiscovery.java b/dubbo-registry-extensions/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulServiceDiscovery.java
new file mode 100644
index 0000000..2771a99
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulServiceDiscovery.java
@@ -0,0 +1,469 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.consul;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.CollectionUtils;
+import org.apache.dubbo.common.utils.NamedThreadFactory;
+import org.apache.dubbo.common.utils.StringUtils;
+import org.apache.dubbo.registry.client.AbstractServiceDiscovery;
+import org.apache.dubbo.registry.client.DefaultServiceInstance;
+import org.apache.dubbo.registry.client.ServiceInstance;
+import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent;
+import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener;
+import org.apache.dubbo.rpc.model.ApplicationModel;
+
+import com.ecwid.consul.v1.ConsistencyMode;
+import com.ecwid.consul.v1.ConsulClient;
+import com.ecwid.consul.v1.QueryParams;
+import com.ecwid.consul.v1.Response;
+import com.ecwid.consul.v1.agent.model.NewService;
+import com.ecwid.consul.v1.catalog.CatalogServicesRequest;
+import com.ecwid.consul.v1.health.HealthServicesRequest;
+import com.ecwid.consul.v1.health.model.HealthService;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static java.util.concurrent.Executors.newCachedThreadPool;
+import static org.apache.dubbo.common.constants.CommonConstants.COMMA_SEPARATOR_CHAR;
+import static org.apache.dubbo.common.constants.CommonConstants.SEMICOLON_SPLIT_PATTERN;
+import static org.apache.dubbo.registry.consul.AbstractConsulRegistry.CHECK_PASS_INTERVAL;
+import static org.apache.dubbo.registry.consul.AbstractConsulRegistry.DEFAULT_CHECK_PASS_INTERVAL;
+import static org.apache.dubbo.registry.consul.AbstractConsulRegistry.DEFAULT_DEREGISTER_TIME;
+import static org.apache.dubbo.registry.consul.AbstractConsulRegistry.DEREGISTER_AFTER;
+import static org.apache.dubbo.registry.consul.AbstractConsulRegistry.ONE_THOUSAND;
+import static org.apache.dubbo.registry.consul.AbstractConsulRegistry.PERIOD_DENOMINATOR;
+import static org.apache.dubbo.registry.consul.ConsulConstants.DEFAULT_WATCH_TIMEOUT;
+import static org.apache.dubbo.registry.consul.ConsulConstants.WATCH_TIMEOUT;
+import static org.apache.dubbo.registry.consul.ConsulParameter.ACL_TOKEN;
+import static org.apache.dubbo.registry.consul.ConsulParameter.CONSISTENCY_MODE;
+import static org.apache.dubbo.registry.consul.ConsulParameter.DEFAULT_ZONE_METADATA_NAME;
+import static org.apache.dubbo.registry.consul.ConsulParameter.INSTANCE_GROUP;
+import static org.apache.dubbo.registry.consul.ConsulParameter.INSTANCE_ZONE;
+import static org.apache.dubbo.registry.consul.ConsulParameter.TAGS;
+
+public class ConsulServiceDiscovery extends AbstractServiceDiscovery {
+
+ private static final String QUERY_TAG = "consul_query_tag";
+ private static final String REGISTER_TAG = "consul_register_tag";
+
+ private List<String> registeringTags = new ArrayList<>();
+ private String tag;
+ private ConsulClient client;
+ private ExecutorService notifierExecutor = newCachedThreadPool(
+ new NamedThreadFactory("dubbo-service-discovery-consul-notifier", true));
+ private Map<String, ConsulNotifier> notifiers = new ConcurrentHashMap<>();
+ private TtlScheduler ttlScheduler;
+ private long checkPassInterval;
+ private URL url;
+
+ private String aclToken;
+
+ private List<String> tags;
+
+ private ConsistencyMode consistencyMode;
+
+ private String defaultZoneMetadataName;
+
+ /**
+ * Service instance zone.
+ */
+ private String instanceZone;
+
+ /**
+ * Service instance group.
+ */
+ private String instanceGroup;
+
+ public ConsulServiceDiscovery(ApplicationModel applicationModel, URL registryURL) {
+ super(applicationModel, registryURL);
+ this.url = registryURL;
+ String host = url.getHost();
+ int port = ConsulConstants.INVALID_PORT != url.getPort() ? url.getPort() : ConsulConstants.DEFAULT_PORT;
+ checkPassInterval = url.getParameter(CHECK_PASS_INTERVAL, DEFAULT_CHECK_PASS_INTERVAL);
+ client = new ConsulClient(host, port);
+ ttlScheduler = new TtlScheduler(checkPassInterval, client);
+ this.tag = registryURL.getParameter(QUERY_TAG);
+ this.registeringTags.addAll(getRegisteringTags(url));
+ this.aclToken = ACL_TOKEN.getValue(registryURL);
+ this.tags = getTags(registryURL);
+ this.consistencyMode = getConsistencyMode(registryURL);
+ this.defaultZoneMetadataName = DEFAULT_ZONE_METADATA_NAME.getValue(registryURL);
+ this.instanceZone = INSTANCE_ZONE.getValue(registryURL);
+ this.instanceGroup = INSTANCE_GROUP.getValue(registryURL);
+ }
+
+ /**
+ * Get the {@link ConsistencyMode}
+ *
+ * @param registryURL the {@link URL} of registry
+ * @return non-null, {@link ConsistencyMode#DEFAULT} as default
+ * @sine 2.7.8
+ */
+ private ConsistencyMode getConsistencyMode(URL registryURL) {
+ String value = CONSISTENCY_MODE.getValue(registryURL);
+ if (StringUtils.isNotEmpty(value)) {
+ return ConsistencyMode.valueOf(value);
+ }
+ return ConsistencyMode.DEFAULT;
+ }
+
+ /**
+ * Get the "tags" from the {@link URL} of registry
+ *
+ * @param registryURL the {@link URL} of registry
+ * @return non-null
+ * @sine 2.7.8
+ */
+ private List<String> getTags(URL registryURL) {
+ String value = TAGS.getValue(registryURL);
+ return StringUtils.splitToList(value, COMMA_SEPARATOR_CHAR);
+ }
+
+ private List<String> getRegisteringTags(URL url) {
+ List<String> tags = new ArrayList<>();
+ String rawTag = url.getParameter(REGISTER_TAG);
+ if (StringUtils.isNotEmpty(rawTag)) {
+ tags.addAll(Arrays.asList(SEMICOLON_SPLIT_PATTERN.split(rawTag)));
+ }
+ return tags;
+ }
+
+ @Override
+ protected void doDestroy() throws Exception {
+ notifiers.forEach((_k, notifier) -> {
+ if (notifier != null) {
+ notifier.stop();
+ }
+ });
+ notifiers.clear();
+ notifierExecutor.shutdownNow();
+ ttlScheduler.stop();
+ }
+
+ @Override
+ public void doRegister(ServiceInstance serviceInstance) {
+ NewService consulService = buildService(serviceInstance);
+ ttlScheduler.add(consulService.getId());
+ client.agentServiceRegister(consulService, aclToken);
+ }
+
+ @Override
+ protected void doUnregister(ServiceInstance serviceInstance) {
+ String id = buildId(serviceInstance);
+ ttlScheduler.remove(id);
+ client.agentServiceDeregister(id, aclToken);
+ }
+
+ @Override
+ public synchronized void addServiceInstancesChangedListener(ServiceInstancesChangedListener listener)
+ throws NullPointerException, IllegalArgumentException {
+ Set<String> serviceNames = listener.getServiceNames();
+ for (String serviceName : serviceNames) {
+ ConsulNotifier notifier = notifiers.get(serviceName);
+ if (notifier == null) {
+ Response<List<HealthService>> response = getHealthServices(serviceName, -1, buildWatchTimeout());
+ Long consulIndex = response.getConsulIndex();
+ notifier = new ConsulNotifier(serviceName, consulIndex);
+ notifiers.put(serviceName, notifier);
+ }
+ notifier.addListener(listener);
+ notifierExecutor.execute(notifier);
+ }
+ }
+
+ @Override
+ public synchronized void removeServiceInstancesChangedListener(ServiceInstancesChangedListener listener) throws IllegalArgumentException {
+ Set<String> serviceNames = listener.getServiceNames();
+ for (String serviceName : serviceNames) {
+ ConsulNotifier notifier = notifiers.get(serviceName);
+ if (notifier != null) {
+ notifier.removeListener(listener);
+ if (notifier.getListenerCount() == 0) {
+ notifier.stop();
+ notifiers.remove(serviceName);
+ }
+ }
+ }
+ }
+
+ @Override
+ public Set<String> getServices() {
+ CatalogServicesRequest request = CatalogServicesRequest.newBuilder()
+ .setQueryParams(QueryParams.DEFAULT)
+ .setToken(aclToken)
+ .build();
+ return this.client.getCatalogServices(request).getValue().keySet();
+ }
+
+ @Override
+ public List<ServiceInstance> getInstances(String serviceName) throws NullPointerException {
+ Response<List<HealthService>> response = getHealthServices(serviceName, -1, buildWatchTimeout());
+ return convert(response.getValue());
+ }
+
+ private List<ServiceInstance> convert(List<HealthService> services) {
+ return services.stream()
+ .map(HealthService::getService)
+ .map(service -> {
+ ServiceInstance instance = new DefaultServiceInstance(
+ service.getService(),
+ service.getAddress(),
+ service.getPort(),
+ applicationModel);
+ instance.getMetadata().putAll(getMetadata(service));
+ return instance;
+ })
+ .collect(Collectors.toList());
+ }
+
+ private Response<List<HealthService>> getHealthServices(String service, long index, int watchTimeout) {
+ HealthServicesRequest request = HealthServicesRequest.newBuilder()
+ .setTag(tag)
+ .setQueryParams(new QueryParams(watchTimeout, index))
+ .setPassing(true)
+ .build();
+ return client.getHealthServices(service, request);
+ }
+
+ private Map<String, String> getMetadata(HealthService.Service service) {
+ Map<String, String> metadata = service.getMeta();
+ metadata = decodeMetadata(metadata);
+ if (CollectionUtils.isEmptyMap(metadata)) {
+ metadata = getScCompatibleMetadata(service.getTags());
+ }
+ return metadata;
+ }
+
+ private Map<String, String> getScCompatibleMetadata(List<String> tags) {
+ LinkedHashMap<String, String> metadata = new LinkedHashMap<>();
+ if (tags != null) {
+ for (String tag : tags) {
+ String[] parts = StringUtils.delimitedListToStringArray(tag, "=");
+ switch (parts.length) {
+ case 0:
+ break;
+ case 1:
+ metadata.put(parts[0], parts[0]);
+ break;
+ case 2:
+ metadata.put(parts[0], parts[1]);
+ break;
+ default:
+ String[] end = Arrays.copyOfRange(parts, 1, parts.length);
+ metadata.put(parts[0], StringUtils.arrayToDelimitedString(end, "="));
+ break;
+ }
+
+ }
+ }
+
+ return metadata;
+ }
+
+ private NewService buildService(ServiceInstance serviceInstance) {
+ NewService service = new NewService();
+ service.setAddress(serviceInstance.getHost());
+ service.setPort(serviceInstance.getPort());
+ service.setId(buildId(serviceInstance));
+ service.setName(serviceInstance.getServiceName());
+ service.setCheck(buildCheck(serviceInstance));
+ service.setTags(buildTags(serviceInstance));
+ return service;
+ }
+
+ private String buildId(ServiceInstance serviceInstance) {
+ return Integer.toHexString(serviceInstance.hashCode());
+ }
+
+ private List<String> buildTags(ServiceInstance serviceInstance) {
+ List<String> tags = new LinkedList<>(this.tags);
+
+ if (StringUtils.isNotEmpty(instanceZone)) {
+ tags.add(defaultZoneMetadataName + "=" + instanceZone);
+ }
+
+ if (StringUtils.isNotEmpty(instanceGroup)) {
+ tags.add("group=" + instanceGroup);
+ }
+
+ Map<String, String> params = serviceInstance.getMetadata();
+ params.keySet().stream()
+ .map(k -> k + "=" + params.get(k))
+ .forEach(tags::add);
+
+ tags.addAll(registeringTags);
+ return tags;
+ }
+
+ private Map<String, String> decodeMetadata(Map<String, String> metadata) {
+ if (metadata == null) {
+ return metadata;
+ }
+ Map<String, String> decoded = new HashMap<>(metadata.size());
+ metadata.forEach((k, v) -> decoded.put(new String(Base64.getDecoder().decode(k)), v));
+ return decoded;
+ }
+
+ private NewService.Check buildCheck(ServiceInstance serviceInstance) {
+ NewService.Check check = new NewService.Check();
+ check.setTtl((checkPassInterval / ONE_THOUSAND) + "s");
+ String deregister = serviceInstance.getMetadata().get(DEREGISTER_AFTER);
+ check.setDeregisterCriticalServiceAfter(deregister == null ? DEFAULT_DEREGISTER_TIME : deregister);
+ return check;
+ }
+
+ private int buildWatchTimeout() {
+ return url.getParameter(WATCH_TIMEOUT, DEFAULT_WATCH_TIMEOUT) / ONE_THOUSAND;
+ }
+
+ private class ConsulNotifier implements Runnable {
+ private final String serviceName;
+ private long consulIndex;
+ private boolean running;
+
+ private final List<ServiceInstancesChangedListener> listener;
+
+ ConsulNotifier(String serviceName, long consulIndex) {
+ this.serviceName = serviceName;
+ this.consulIndex = consulIndex;
+ this.running = true;
+ this.listener = new CopyOnWriteArrayList<>();
+ }
+
+ @Override
+ public void run() {
+ while (this.running) {
+ processService();
+ }
+ }
+
+ private void processService() {
+ Response<List<HealthService>> response = getHealthServices(serviceName, consulIndex, Integer.MAX_VALUE);
+ Long currentIndex = response.getConsulIndex();
+ if (currentIndex != null && currentIndex > consulIndex) {
+ consulIndex = currentIndex;
+ List<HealthService> services = response.getValue();
+ List<ServiceInstance> serviceInstances = convert(services);
+ listener.forEach(l -> l.onEvent(new ServiceInstancesChangedEvent(serviceName, serviceInstances)));
+ }
+ }
+
+ public void addListener(ServiceInstancesChangedListener listener) {
+ this.listener.add(listener);
+ }
+
+ public void removeListener(ServiceInstancesChangedListener listener) {
+ this.listener.remove(listener);
+ }
+
+ public int getListenerCount() {
+ return this.listener.size();
+ }
+
+ void stop() {
+ this.running = false;
+ }
+ }
+
+ private static class TtlScheduler {
+
+ private static final Logger logger = LoggerFactory.getLogger(TtlScheduler.class);
+
+ private final Map<String, ScheduledFuture> serviceHeartbeats = new ConcurrentHashMap<>();
+
+ private ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
+
+ private long checkInterval;
+
+ private ConsulClient client;
+
+ public TtlScheduler(long checkInterval, ConsulClient client) {
+ this.checkInterval = checkInterval;
+ this.client = client;
+ }
+
+ /**
+ * Add a service to the checks loop.
+ *
+ * @param instanceId instance id
+ */
+ public void add(String instanceId) {
+ ScheduledFuture task = this.scheduler.scheduleAtFixedRate(
+ new ConsulHeartbeatTask(instanceId),
+ checkInterval / PERIOD_DENOMINATOR,
+ checkInterval / PERIOD_DENOMINATOR,
+ TimeUnit.MILLISECONDS);
+ ScheduledFuture previousTask = this.serviceHeartbeats.put(instanceId, task);
+ if (previousTask != null) {
+ previousTask.cancel(true);
+ }
+ }
+
+ public void remove(String instanceId) {
+ ScheduledFuture task = this.serviceHeartbeats.get(instanceId);
+ if (task != null) {
+ task.cancel(true);
+ }
+ this.serviceHeartbeats.remove(instanceId);
+ }
+
+ private class ConsulHeartbeatTask implements Runnable {
+
+ private String checkId;
+
+ ConsulHeartbeatTask(String serviceId) {
+ this.checkId = serviceId;
+ if (!this.checkId.startsWith("service:")) {
+ this.checkId = "service:" + this.checkId;
+ }
+ }
+
+ @Override
+ public void run() {
+ TtlScheduler.this.client.agentCheckPass(this.checkId);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Sending consul heartbeat for: " + this.checkId);
+ }
+ }
+
+ }
+
+ public void stop() {
+ scheduler.shutdownNow();
+ }
+
+ }
+}
diff --git a/dubbo-registry-extensions/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulServiceDiscoveryFactory.java b/dubbo-registry-extensions/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulServiceDiscoveryFactory.java
new file mode 100644
index 0000000..dcb0b5d
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulServiceDiscoveryFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.consul;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.registry.client.AbstractServiceDiscoveryFactory;
+import org.apache.dubbo.registry.client.ServiceDiscovery;
+
+public class ConsulServiceDiscoveryFactory extends AbstractServiceDiscoveryFactory {
+
+ @Override
+ protected ServiceDiscovery createDiscovery(URL registryURL) {
+ return new ConsulServiceDiscovery(applicationModel, registryURL);
+ }
+
+}
diff --git a/dubbo-registry-extensions/dubbo-registry-consul/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.RegistryFactory b/dubbo-registry-extensions/dubbo-registry-consul/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.RegistryFactory
new file mode 100644
index 0000000..7aea18f
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-consul/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.RegistryFactory
@@ -0,0 +1 @@
+consul=org.apache.dubbo.registry.consul.ConsulRegistryFactory
diff --git a/dubbo-registry-extensions/dubbo-registry-consul/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.ServiceDiscoveryFactory b/dubbo-registry-extensions/dubbo-registry-consul/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.ServiceDiscoveryFactory
new file mode 100644
index 0000000..a0f1252
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-consul/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.ServiceDiscoveryFactory
@@ -0,0 +1 @@
+consul=org.apache.dubbo.registry.consul.ConsulServiceDiscoveryFactory
\ No newline at end of file
diff --git a/dubbo-registry-extensions/dubbo-registry-consul/src/test/java/org/apache/dubbo/registry/consul/ConsulRegistryTest.java b/dubbo-registry-extensions/dubbo-registry-consul/src/test/java/org/apache/dubbo/registry/consul/ConsulRegistryTest.java
new file mode 100644
index 0000000..fcd0c99
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-consul/src/test/java/org/apache/dubbo/registry/consul/ConsulRegistryTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.consul;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.status.Status;
+import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.registry.NotifyListener;
+import org.apache.dubbo.registry.Registry;
+import org.apache.dubbo.registry.status.RegistryStatusChecker;
+import org.apache.dubbo.rpc.model.ApplicationModel;
+
+import com.pszymczyk.consul.ConsulProcess;
+import com.pszymczyk.consul.ConsulStarterBuilder;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Mockito.mock;
+
+public class ConsulRegistryTest {
+
+ private ConsulProcess consul;
+ private ConsulRegistry consulRegistry;
+ private String service = "org.apache.dubbo.test.injvmServie";
+ private URL serviceUrl = URL.valueOf("consul://127.0.0.1:" + NetUtils.getAvailablePort() + "/" + service + "?notify=false&methods=test1,test2");
+ private URL registryUrl;
+ private ConsulRegistryFactory consulRegistryFactory;
+
+ @BeforeEach
+ public void setUp() {
+ Exception exception = null;
+ for (int i = 0; i < 10; i++) {
+ try {
+ this.consul = ConsulStarterBuilder.consulStarter()
+ .build()
+ .start();
+ exception = null;
+ } catch (Exception e) {
+ exception = e;
+ e.printStackTrace();
+ }
+ if (exception == null) {
+ break;
+ }
+ }
+ Assertions.assertNull(exception);
+ this.registryUrl = URL.valueOf("consul://localhost:" + consul.getHttpPort());
+
+ consulRegistryFactory = new ConsulRegistryFactory();
+ this.consulRegistry = (ConsulRegistry) consulRegistryFactory.createRegistry(registryUrl);
+ consulRegistryFactory.setApplicationModel(ApplicationModel.defaultModel());
+ }
+
+ @AfterEach
+ public void tearDown() throws Exception {
+ consul.close();
+ this.consulRegistry.destroy();
+ }
+
+ @Test
+ public void testRegister() {
+ Set<URL> registered;
+
+ for (int i = 0; i < 2; i++) {
+ consulRegistry.register(serviceUrl);
+ registered = consulRegistry.getRegistered();
+ assertThat(registered.contains(serviceUrl), is(true));
+ }
+
+ registered = consulRegistry.getRegistered();
+
+ assertThat(registered.size(), is(1));
+ }
+
+ @Test
+ public void testSubscribe() {
+ NotifyListener listener = mock(NotifyListener.class);
+ consulRegistry.subscribe(serviceUrl, listener);
+
+ Map<URL, Set<NotifyListener>> subscribed = consulRegistry.getSubscribed();
+ assertThat(subscribed.size(), is(1));
+ assertThat(subscribed.get(serviceUrl).size(), is(1));
+
+ consulRegistry.unsubscribe(serviceUrl, listener);
+ subscribed = consulRegistry.getSubscribed();
+ assertThat(subscribed.size(), is(1));
+ assertThat(subscribed.get(serviceUrl).size(), is(0));
+ }
+
+ @Test
+ public void testAvailable() {
+ consulRegistry.register(serviceUrl);
+ assertThat(consulRegistry.isAvailable(), is(true));
+
+// consulRegistry.destroy();
+// assertThat(consulRegistry.isAvailable(), is(false));
+ }
+
+ @Test
+ public void testLookup() throws InterruptedException {
+ List<URL> lookup = consulRegistry.lookup(serviceUrl);
+ assertThat(lookup.size(), is(0));
+
+ consulRegistry.register(serviceUrl);
+ Thread.sleep(5000);
+ lookup = consulRegistry.lookup(serviceUrl);
+ assertThat(lookup.size(), is(1));
+ }
+
+ @Test
+ public void testStatusChecker() {
+ RegistryStatusChecker registryStatusChecker = new RegistryStatusChecker(ApplicationModel.defaultModel());
+ Status status = registryStatusChecker.check();
+ assertThat(status.getLevel(), is(Status.Level.UNKNOWN));
+
+ Registry registry = consulRegistryFactory.getRegistry(registryUrl);
+ assertThat(registry, not(nullValue()));
+
+ status = registryStatusChecker.check();
+ assertThat(status.getLevel(), is(Status.Level.OK));
+
+ registry.register(serviceUrl);
+ status = registryStatusChecker.check();
+ assertThat(status.getLevel(), is(Status.Level.OK));
+ }
+
+}
diff --git a/dubbo-registry-extensions/dubbo-registry-consul/src/test/java/org/apache/dubbo/registry/consul/ConsulServiceDiscoveryTest.java b/dubbo-registry-extensions/dubbo-registry-consul/src/test/java/org/apache/dubbo/registry/consul/ConsulServiceDiscoveryTest.java
new file mode 100644
index 0000000..f6e468f
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-consul/src/test/java/org/apache/dubbo/registry/consul/ConsulServiceDiscoveryTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.consul;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.config.ApplicationConfig;
+import org.apache.dubbo.registry.client.DefaultServiceInstance;
+import org.apache.dubbo.registry.client.ServiceInstance;
+import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent;
+import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener;
+import org.apache.dubbo.rpc.model.ApplicationModel;
+import org.apache.dubbo.rpc.model.FrameworkModel;
+
+import com.pszymczyk.consul.ConsulProcess;
+import com.pszymczyk.consul.ConsulStarterBuilder;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class ConsulServiceDiscoveryTest {
+
+ private URL url;
+ private ConsulServiceDiscovery consulServiceDiscovery;
+ private ConsulProcess consul;
+ private static final String SERVICE_NAME = "A";
+ private static final String LOCALHOST = "127.0.0.1";
+
+ private ApplicationModel applicationModel;
+
+ @BeforeEach
+ public void init() throws Exception {
+ this.applicationModel = FrameworkModel.defaultModel().newApplication();
+ applicationModel.getApplicationConfigManager().setApplication(new ApplicationConfig("A"));
+ this.consul = ConsulStarterBuilder.consulStarter()
+ .build()
+ .start();
+ url = URL.valueOf("consul://localhost:" + consul.getHttpPort()).addParameter("consul-check-pass-interval", "1000");
+ consulServiceDiscovery = new ConsulServiceDiscovery(applicationModel, url);
+ }
+
+ @AfterEach
+ public void close() throws Exception {
+ consulServiceDiscovery.destroy();
+ consul.close();
+ this.applicationModel.destroy();
+ }
+
+ @Test
+ public void testRegister() throws InterruptedException {
+ ServiceInstance serviceInstance = new DefaultServiceInstance(SERVICE_NAME, LOCALHOST, 8080, applicationModel);
+ serviceInstance.getMetadata().put("test", "test");
+ serviceInstance.getMetadata().put("test123", "test");
+ consulServiceDiscovery.doRegister(serviceInstance);
+ Thread.sleep(3000);
+ List<ServiceInstance> serviceInstances = consulServiceDiscovery.getInstances(SERVICE_NAME);
+ assertEquals(1, serviceInstances.size());
+ assertEquals(serviceInstance, serviceInstances.get(0));
+
+ ServiceInstance serviceInstance2 = new DefaultServiceInstance(SERVICE_NAME, LOCALHOST, 8081, applicationModel);
+ serviceInstance2.getMetadata().put("test", "test");
+ serviceInstance2.getMetadata().put("test123", "test");
+ consulServiceDiscovery.doRegister(serviceInstance2);
+
+ Thread.sleep(3000);
+ serviceInstances = consulServiceDiscovery.getInstances(SERVICE_NAME);
+ assertEquals(2, serviceInstances.size());
+ assertEquals(Arrays.asList(serviceInstance, serviceInstance2), serviceInstances);
+
+ consulServiceDiscovery.doUnregister(serviceInstance);
+ Thread.sleep(3000);
+ serviceInstances = consulServiceDiscovery.getInstances(SERVICE_NAME);
+ assertEquals(1, serviceInstances.size());
+ assertEquals(serviceInstance2, serviceInstances.get(0));
+
+ consulServiceDiscovery.doUnregister(serviceInstance2);
+ Thread.sleep(3000);
+ serviceInstances = consulServiceDiscovery.getInstances(SERVICE_NAME);
+ assertEquals(0, serviceInstances.size());
+ }
+
+ @Test
+ public void testNotify() throws InterruptedException {
+ ServiceInstancesChangedListener serviceInstancesChangedListener1 = Mockito.spy(new ServiceInstancesChangedListener(Collections.singleton(SERVICE_NAME), consulServiceDiscovery));
+ ServiceInstancesChangedListener serviceInstancesChangedListener2 = Mockito.spy(new ServiceInstancesChangedListener(Collections.singleton(SERVICE_NAME), consulServiceDiscovery));
+ consulServiceDiscovery.addServiceInstancesChangedListener(serviceInstancesChangedListener1);
+ consulServiceDiscovery.addServiceInstancesChangedListener(serviceInstancesChangedListener2);
+ consulServiceDiscovery.removeServiceInstancesChangedListener(serviceInstancesChangedListener2);
+ ArgumentCaptor<ServiceInstancesChangedEvent> eventArgumentCaptor = ArgumentCaptor.forClass(ServiceInstancesChangedEvent.class);
+
+ ServiceInstance serviceInstance = new DefaultServiceInstance(SERVICE_NAME, LOCALHOST, 8080, applicationModel);
+ serviceInstance.getMetadata().put("test", "test");
+ serviceInstance.getMetadata().put("test123", "test");
+ consulServiceDiscovery.doRegister(serviceInstance);
+ Thread.sleep(3000);
+
+ Mockito.verify(serviceInstancesChangedListener1, Mockito.atLeast(1)).onEvent(eventArgumentCaptor.capture());
+ List<ServiceInstance> serviceInstances = eventArgumentCaptor.getValue().getServiceInstances();
+ assertEquals(1, serviceInstances.size());
+ assertEquals(serviceInstance, serviceInstances.get(0));
+
+ ServiceInstance serviceInstance2 = new DefaultServiceInstance(SERVICE_NAME, LOCALHOST, 8081, applicationModel);
+ serviceInstance2.getMetadata().put("test", "test");
+ serviceInstance2.getMetadata().put("test123", "test");
+ consulServiceDiscovery.doRegister(serviceInstance2);
+
+ Thread.sleep(3000);
+ Mockito.verify(serviceInstancesChangedListener1, Mockito.atLeast(2)).onEvent(eventArgumentCaptor.capture());
+ serviceInstances = eventArgumentCaptor.getValue().getServiceInstances();
+ assertEquals(2, serviceInstances.size());
+ assertEquals(Arrays.asList(serviceInstance, serviceInstance2), serviceInstances);
+
+ consulServiceDiscovery.doUnregister(serviceInstance);
+ Thread.sleep(3000);
+ Mockito.verify(serviceInstancesChangedListener1, Mockito.atLeast(3)).onEvent(eventArgumentCaptor.capture());
+ serviceInstances = eventArgumentCaptor.getValue().getServiceInstances();
+ assertEquals(1, serviceInstances.size());
+ assertEquals(serviceInstance2, serviceInstances.get(0));
+
+ consulServiceDiscovery.doUnregister(serviceInstance2);
+ Thread.sleep(3000);
+ Mockito.verify(serviceInstancesChangedListener1, Mockito.atLeast(4)).onEvent(eventArgumentCaptor.capture());
+ serviceInstances = eventArgumentCaptor.getValue().getServiceInstances();
+ assertEquals(0, serviceInstances.size());
+
+ Mockito.verify(serviceInstancesChangedListener2, Mockito.never()).onEvent(Mockito.any());
+ }
+}
diff --git a/dubbo-registry-extensions/dubbo-registry-etcd3/pom.xml b/dubbo-registry-extensions/dubbo-registry-etcd3/pom.xml
new file mode 100644
index 0000000..edf71af
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-etcd3/pom.xml
@@ -0,0 +1,51 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>dubbo-registry-extensions</artifactId>
+ <groupId>org.apache.dubbo.extensions</groupId>
+ <version>${revision}</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <version>1.0.0-SNAPSHOT</version>
+ <artifactId>dubbo-registry-etcd3</artifactId>
+ <packaging>jar</packaging>
+ <name>${project.artifactId}</name>
+ <description>The etcd3 registry module of Dubbo project</description>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo-registry-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.dubbo.extensions</groupId>
+ <artifactId>dubbo-remoting-etcd3</artifactId>
+ <version>1.0.0-SNAPSHOT</version>
+ </dependency>
+ </dependencies>
+
+
+</project>
diff --git a/dubbo-registry-extensions/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdRegistry.java b/dubbo-registry-extensions/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdRegistry.java
new file mode 100644
index 0000000..ef7ee63
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdRegistry.java
@@ -0,0 +1,365 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.etcd;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.utils.CollectionUtils;
+import org.apache.dubbo.common.utils.ConcurrentHashSet;
+import org.apache.dubbo.common.utils.UrlUtils;
+import org.apache.dubbo.registry.NotifyListener;
+import org.apache.dubbo.registry.support.FailbackRegistry;
+import org.apache.dubbo.remoting.etcd.ChildListener;
+import org.apache.dubbo.remoting.etcd.Constants;
+import org.apache.dubbo.remoting.etcd.EtcdClient;
+import org.apache.dubbo.remoting.etcd.EtcdTransporter;
+import org.apache.dubbo.remoting.etcd.StateListener;
+import org.apache.dubbo.remoting.etcd.option.OptionUtil;
+import org.apache.dubbo.rpc.RpcException;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import static org.apache.dubbo.common.constants.CommonConstants.ANY_VALUE;
+import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.PATH_SEPARATOR;
+import static org.apache.dubbo.common.constants.RegistryConstants.CATEGORY_KEY;
+import static org.apache.dubbo.common.constants.RegistryConstants.CONFIGURATORS_CATEGORY;
+import static org.apache.dubbo.common.constants.RegistryConstants.CONSUMERS_CATEGORY;
+import static org.apache.dubbo.common.constants.RegistryConstants.DEFAULT_CATEGORY;
+import static org.apache.dubbo.common.constants.RegistryConstants.DYNAMIC_KEY;
+import static org.apache.dubbo.common.constants.RegistryConstants.EMPTY_PROTOCOL;
+import static org.apache.dubbo.common.constants.RegistryConstants.PROVIDERS_CATEGORY;
+import static org.apache.dubbo.common.constants.RegistryConstants.ROUTERS_CATEGORY;
+import static org.apache.dubbo.remoting.Constants.CHECK_KEY;
+
+
+/**
+ * Support for ectd3 registry.
+ */
+public class EtcdRegistry extends FailbackRegistry {
+
+ private static final int DEFAULT_ETCD_PORT = 2379;
+
+ private static final String DEFAULT_ROOT = "dubbo";
+
+ private final String root;
+
+ private final Set<String> anyServices = new ConcurrentHashSet<>();
+
+ private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> etcdListeners = new ConcurrentHashMap<>();
+ private final EtcdClient etcdClient;
+
+ public EtcdRegistry(URL url, EtcdTransporter etcdTransporter) {
+ super(url);
+ if (url.isAnyHost()) {
+ throw new IllegalStateException("registry address is invalid, actual: '" + url.getHost() + "'");
+ }
+ String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT);
+ if (!group.startsWith(PATH_SEPARATOR)) {
+ group = PATH_SEPARATOR + group;
+ }
+ this.root = group;
+ etcdClient = etcdTransporter.connect(url);
+
+ etcdClient.addStateListener(state -> {
+ if (state == StateListener.CONNECTED) {
+ try {
+ recover();
+ } catch (Exception e) {
+ logger.error(e.getMessage(), e);
+ }
+ }
+ });
+ }
+
+ protected static String appendDefaultPort(String address) {
+ if (address != null && address.length() > 0) {
+ int i = address.indexOf(':');
+ if (i < 0) {
+ return address + ":" + DEFAULT_ETCD_PORT;
+ } else if (Integer.parseInt(address.substring(i + 1)) == 0) {
+ return address.substring(0, i + 1) + DEFAULT_ETCD_PORT;
+ }
+ }
+ return address;
+ }
+
+ @Override
+ public void doRegister(URL url) {
+ try {
+ String path = toUrlPath(url);
+ if (url.getParameter(DYNAMIC_KEY, true)) {
+ etcdClient.createEphemeral(path);
+ return;
+ }
+ etcdClient.create(path);
+ } catch (Throwable e) {
+ throw new RpcException("Failed to register " + url + " to etcd " + getUrl()
+ + ", cause: " + (OptionUtil.isProtocolError(e)
+ ? "etcd3 registry may not be supported yet or etcd3 registry is not available."
+ : e.getMessage()), e);
+ }
+ }
+
+ @Override
+ public void doUnregister(URL url) {
+ try {
+ String path = toUrlPath(url);
+ etcdClient.delete(path);
+ } catch (Throwable e) {
+ throw new RpcException("Failed to unregister " + url + " to etcd " + getUrl() + ", cause: " + e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void doSubscribe(URL url, NotifyListener listener) {
+ try {
+ if (ANY_VALUE.equals(url.getServiceInterface())) {
+ String root = toRootPath();
+
+ /*
+ * if we are interested in all interfaces,
+ * find out the current container or create one for the url, put or get only once.
+ */
+ ConcurrentMap<NotifyListener, ChildListener> listeners =
+ Optional.ofNullable(etcdListeners.get(url))
+ .orElseGet(() -> {
+ ConcurrentMap<NotifyListener, ChildListener> container, prev;
+ prev = etcdListeners.putIfAbsent(url, container = new ConcurrentHashMap<>());
+ return prev != null ? prev : container;
+ });
+
+ /*
+ * if we have no interface watcher listener,
+ * find the current listener or create one for the current root, put or get only once.
+ */
+ ChildListener interfaceListener =
+ Optional.ofNullable(listeners.get(listener))
+ .orElseGet(() -> {
+ ChildListener childListener, prev;
+ prev = listeners.putIfAbsent(listener, childListener = (parentPath, currentChildren) -> {
+ /*
+ * because etcd3 does not support direct children watch events,
+ * we should filter not interface events. if we watch /dubbo
+ * and /dubbo/interface, when we put a key-value pair {/dubbo/interface/hello hello},
+ * we will got events in watching path /dubbo.
+ */
+ for (String child : currentChildren) {
+ child = URL.decode(child);
+ if (!anyServices.contains(child)) {
+ anyServices.add(child);
+ /*
+ * if new interface event arrived, we watch their direct children,
+ * eg: /dubbo/interface, /dubbo/interface and so on.
+ */
+ subscribe(url.setPath(child).addParameters(INTERFACE_KEY, child,
+ CHECK_KEY, String.valueOf(false)), listener);
+ }
+ }
+ });
+ return prev != null ? prev : childListener;
+ });
+
+ etcdClient.create(root);
+ /*
+ * at the first time, we want to pull already interface and then watch their direct children,
+ * eg: /dubbo/interface, /dubbo/interface and so on.
+ */
+ List<String> services = etcdClient.addChildListener(root, interfaceListener);
+ for (String service : services) {
+ service = URL.decode(service);
+ anyServices.add(service);
+ subscribe(url.setPath(service).addParameters(INTERFACE_KEY, service,
+ CHECK_KEY, String.valueOf(false)), listener);
+ }
+ } else {
+ List<URL> urls = new ArrayList<>();
+ for (String path : toCategoriesPath(url)) {
+
+ /*
+ * if we are interested in special categories (providers, consumers, routers and so on),
+ * we find out the current container or create one for the url, put or get only once.
+ */
+ ConcurrentMap<NotifyListener, ChildListener> listeners =
+ Optional.ofNullable(etcdListeners.get(url))
+ .orElseGet(() -> {
+ ConcurrentMap<NotifyListener, ChildListener> container, prev;
+ prev = etcdListeners.putIfAbsent(url,
+ container = new ConcurrentHashMap<>());
+ return prev != null ? prev : container;
+ });
+
+ /*
+ * if we have no category watcher listener,
+ * we find out the current listener or create one for the current category, put or get only once.
+ */
+ ChildListener childListener =
+ Optional.ofNullable(listeners.get(listener))
+ .orElseGet(() -> {
+ ChildListener watchListener, prev;
+ prev = listeners.putIfAbsent(listener, watchListener = (parentPath, currentChildren) -> EtcdRegistry.this.notify(url, listener,
+ toUrlsWithEmpty(url, parentPath, currentChildren)));
+ return prev != null ? prev : watchListener;
+ });
+
+ etcdClient.create(path);
+ /*
+ * at the first time, we want to pull already category and then watch their direct children,
+ * eg: /dubbo/interface/providers, /dubbo/interface/consumers and so on.
+ */
+ List<String> children = etcdClient.addChildListener(path, childListener);
+ if (children != null) {
+ urls.addAll(toUrlsWithEmpty(url, path, children));
+ }
+ }
+ notify(url, listener, urls);
+ }
+ } catch (Throwable e) {
+ throw new RpcException("Failed to subscribe " + url + " to etcd " + getUrl()
+ + ", cause: " + (OptionUtil.isProtocolError(e)
+ ? "etcd3 registry may not be supported yet or etcd3 registry is not available."
+ : e.getMessage()), e);
+ }
+ }
+
+ @Override
+ public void doUnsubscribe(URL url, NotifyListener listener) {
+ ConcurrentMap<NotifyListener, ChildListener> listeners = etcdListeners.get(url);
+ if (listeners != null) {
+ ChildListener etcdListener = listeners.remove(listener);
+ if (etcdListener != null) {
+ if (ANY_VALUE.equals(url.getServiceInterface())) {
+ String root = toRootPath();
+ etcdClient.removeChildListener(root, etcdListener);
+ }else {
+ // maybe url has many subscribed paths
+ for (String path : toUnsubscribedPath(url)) {
+ etcdClient.removeChildListener(path, etcdListener);
+ }
+ }
+ }
+
+ if(listeners.isEmpty()){
+ etcdListeners.remove(url);
+ }
+ }
+ }
+
+ @Override
+ public boolean isAvailable() {
+ return etcdClient.isConnected();
+ }
+
+ @Override
+ public void destroy() {
+ super.destroy();
+ try {
+ etcdClient.close();
+ } catch (Exception e) {
+ logger.warn("Failed to close etcd client " + getUrl() + ", cause: " + e.getMessage(), e);
+ }
+ }
+
+ protected String toRootDir() {
+ if (root.startsWith(PATH_SEPARATOR)) {
+ return root;
+ }
+ return PATH_SEPARATOR + root;
+ }
+
+ protected String toRootPath() {
+ return root;
+ }
+
+ protected String toServicePath(URL url) {
+ String name = url.getServiceInterface();
+ if (ANY_VALUE.equals(name)) {
+ return toRootPath();
+ }
+ return toRootDir() + PATH_SEPARATOR + URL.encode(name);
+ }
+
+ protected String[] toCategoriesPath(URL url) {
+ String[] categories;
+ if (ANY_VALUE.equals(url.getParameter(CATEGORY_KEY))) {
+ categories = new String[]{PROVIDERS_CATEGORY, CONSUMERS_CATEGORY, ROUTERS_CATEGORY, CONFIGURATORS_CATEGORY};
+ } else {
+ categories = url.getParameter(CATEGORY_KEY, new String[]{DEFAULT_CATEGORY});
+ }
+ String[] paths = new String[categories.length];
+ for (int i = 0; i < categories.length; i++) {
+ paths[i] = toServicePath(url) + PATH_SEPARATOR + categories[i];
+ }
+ return paths;
+ }
+
+ protected String toCategoryPath(URL url) {
+ return toServicePath(url) + PATH_SEPARATOR + url.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY);
+ }
+
+ protected String toUrlPath(URL url) {
+ return toCategoryPath(url) + PATH_SEPARATOR + URL.encode(url.toFullString());
+ }
+
+ protected List<String> toUnsubscribedPath(URL url) {
+ List<String> categories = new ArrayList<>();
+ if (ANY_VALUE.equals(url.getServiceInterface())) {
+ String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT);
+ if (!group.startsWith(PATH_SEPARATOR)) {
+ group = PATH_SEPARATOR + group;
+ }
+ categories.add(group);
+ return categories;
+ } else {
+ categories.addAll(Arrays.asList(toCategoriesPath(url)));
+ }
+ return categories;
+ }
+
+ protected List<URL> toUrlsWithoutEmpty(URL consumer, List<String> providers) {
+ List<URL> urls = new ArrayList<>();
+ if (providers != null && providers.size() > 0) {
+ for (String provider : providers) {
+ provider = URL.decode(provider);
+ if (provider.contains(Constants.HTTP_SUBFIX_KEY)) {
+ URL url = URL.valueOf(provider);
+ if (UrlUtils.isMatch(consumer, url)) {
+ urls.add(url);
+ }
+ }
+ }
+ }
+ return urls;
+ }
+
+ protected List<URL> toUrlsWithEmpty(URL consumer, String path, List<String> providers) {
+ List<URL> urls = toUrlsWithoutEmpty(consumer, providers);
+ if (CollectionUtils.isEmpty(urls)) {
+ int i = path.lastIndexOf('/');
+ String category = i < 0 ? path : path.substring(i + 1);
+ URL empty = consumer.setProtocol(EMPTY_PROTOCOL).addParameter(CATEGORY_KEY, category);
+ urls.add(empty);
+ }
+ return urls;
+ }
+}
diff --git a/dubbo-registry-extensions/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdRegistryFactory.java b/dubbo-registry-extensions/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdRegistryFactory.java
new file mode 100644
index 0000000..a7d00b3
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdRegistryFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.etcd;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.registry.Registry;
+import org.apache.dubbo.registry.support.AbstractRegistryFactory;
+import org.apache.dubbo.remoting.etcd.EtcdTransporter;
+
+public class EtcdRegistryFactory extends AbstractRegistryFactory {
+
+ private EtcdTransporter etcdTransporter;
+
+ @Override
+ protected Registry createRegistry(URL url) {
+ return new EtcdRegistry(url, etcdTransporter);
+ }
+
+ public void setEtcdTransporter(EtcdTransporter etcdTransporter) {
+ this.etcdTransporter = etcdTransporter;
+ }
+}
diff --git a/dubbo-registry-extensions/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdServiceDiscovery.java b/dubbo-registry-extensions/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdServiceDiscovery.java
new file mode 100644
index 0000000..a7bffa4
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdServiceDiscovery.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.etcd;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.CollectionUtils;
+import org.apache.dubbo.common.utils.ConcurrentHashSet;
+import org.apache.dubbo.registry.client.AbstractServiceDiscovery;
+import org.apache.dubbo.registry.client.DefaultServiceInstance;
+import org.apache.dubbo.registry.client.ServiceInstance;
+import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent;
+import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener;
+import org.apache.dubbo.remoting.etcd.ChildListener;
+import org.apache.dubbo.remoting.etcd.EtcdClient;
+import org.apache.dubbo.remoting.etcd.EtcdTransporter;
+import org.apache.dubbo.remoting.etcd.StateListener;
+import org.apache.dubbo.remoting.etcd.option.OptionUtil;
+import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.model.ApplicationModel;
+
+import com.google.gson.Gson;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+/**
+ * 2019-07-08
+ */
+public class EtcdServiceDiscovery extends AbstractServiceDiscovery {
+
+ private static final Logger logger = LoggerFactory.getLogger(EtcdServiceDiscovery.class);
+
+ private final String root = "/services";
+
+ private final Set<String> services = new ConcurrentHashSet<>();
+ private final Map<String, InstanceChildListener> childListenerMap = new ConcurrentHashMap<>();
+
+ EtcdClient etcdClient;
+
+ public EtcdServiceDiscovery(ApplicationModel applicationModel, URL registryURL) {
+ super(applicationModel, registryURL);
+ EtcdTransporter etcdTransporter = applicationModel.getExtensionLoader(EtcdTransporter.class).getAdaptiveExtension();
+
+ if (registryURL.isAnyHost()) {
+ throw new IllegalStateException("Service discovery address is invalid, actual: '" + registryURL.getHost() + "'");
+ }
+
+ etcdClient = etcdTransporter.connect(registryURL);
+
+ etcdClient.addStateListener(state -> {
+ if (state == StateListener.CONNECTED) {
+ try {
+ recover();
+ } catch (Exception e) {
+ logger.error(e.getMessage(), e);
+ }
+ }
+ });
+
+ this.registryURL = registryURL;
+ }
+
+ @Override
+ protected void doDestroy() throws Exception {
+ if (etcdClient != null && etcdClient.isConnected()) {
+ etcdClient.close();
+ }
+ }
+
+ @Override
+ public void doRegister(ServiceInstance serviceInstance) {
+ try {
+ String path = toPath(serviceInstance);
+ etcdClient.putEphemeral(path, new Gson().toJson(serviceInstance));
+ services.add(serviceInstance.getServiceName());
+ } catch (Throwable e) {
+ throw new RpcException("Failed to register " + serviceInstance + " to etcd " + etcdClient.getUrl()
+ + ", cause: " + (OptionUtil.isProtocolError(e)
+ ? "etcd3 registry may not be supported yet or etcd3 registry is not available."
+ : e.getMessage()), e);
+ }
+ }
+
+ protected String toPath(ServiceInstance serviceInstance) {
+ return root + File.separator + serviceInstance.getServiceName() + File.separator + serviceInstance.getHost()
+ + ":" + serviceInstance.getPort();
+ }
+
+ protected String toParentPath(String serviceName) {
+ return root + File.separator + serviceName;
+ }
+
+ @Override
+ protected void doUnregister(ServiceInstance serviceInstance) {
+ try {
+ String path = toPath(serviceInstance);
+ etcdClient.delete(path);
+ services.remove(serviceInstance.getServiceName());
+ } catch (Throwable e) {
+ throw new RpcException("Failed to unregister " + serviceInstance + " to etcd " + etcdClient.getUrl() + ", cause: " + e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public Set<String> getServices() {
+ return Collections.unmodifiableSet(services);
+ }
+
+ @Override
+ public void addServiceInstancesChangedListener(ServiceInstancesChangedListener listener) throws NullPointerException, IllegalArgumentException {
+ for (String serviceName : listener.getServiceNames()) {
+ registerServiceWatcher(serviceName, listener);
+ }
+ }
+
+ @Override
+ public List<ServiceInstance> getInstances(String serviceName) {
+ List<String> children = etcdClient.getChildren(toParentPath(serviceName));
+ if (CollectionUtils.isEmpty(children)) {
+ return Collections.emptyList();
+ }
+ List<ServiceInstance> list = new ArrayList<>(children.size());
+ for (String child : children) {
+ ServiceInstance serviceInstance = new Gson().fromJson(etcdClient.getKVValue(child), DefaultServiceInstance.class);
+ list.add(serviceInstance);
+ }
+ return list;
+ }
+
+ protected void registerServiceWatcher(String serviceName, ServiceInstancesChangedListener listener) {
+ String path = root + File.separator + serviceName;
+
+ InstanceChildListener childListener = childListenerMap.get(serviceName);
+
+ if (childListener == null) {
+ childListener = new InstanceChildListener(serviceName);
+ childListenerMap.put(serviceName, childListener);
+ childListener.addListener(listener);
+
+ etcdClient.create(path);
+ etcdClient.addChildListener(path, childListener);
+ } else {
+ childListener.addListener(listener);
+ }
+ }
+
+ @Override
+ public void removeServiceInstancesChangedListener(ServiceInstancesChangedListener listener) throws IllegalArgumentException {
+ for (String serviceName : listener.getServiceNames()) {
+ unregisterServiceWatcher(serviceName, listener);
+ }
+ }
+
+ protected void unregisterServiceWatcher(String serviceName, ServiceInstancesChangedListener listener) {
+ String path = root + File.separator + serviceName;
+
+ InstanceChildListener childListener = childListenerMap.get(serviceName);
+
+ if (childListener != null) {
+ childListener.removeListener(listener);
+
+ if (childListener.getListenerCount() == 0) {
+ etcdClient.removeChildListener(path, childListener);
+ }
+ }
+ }
+
+ public class InstanceChildListener implements ChildListener {
+ private final List<ServiceInstancesChangedListener> listeners;
+
+ private final String serviceName;
+
+ public InstanceChildListener(String serviceName) {
+ this.serviceName = serviceName;
+ this.listeners = new CopyOnWriteArrayList<>();
+ }
+
+ @Override
+ public void childChanged(String path, List<String> children) {
+ List<ServiceInstance> list = new ArrayList<>(children.size());
+ for (String child : children) {
+ ServiceInstance serviceInstance = new Gson().fromJson(etcdClient.getKVValue(child), DefaultServiceInstance.class);
+ list.add(serviceInstance);
+ }
+
+ for (ServiceInstancesChangedListener listener : listeners) {
+ listener.onEvent(new ServiceInstancesChangedEvent(serviceName, list));
+ }
+ }
+
+ public void addListener(ServiceInstancesChangedListener listener) {
+ listeners.add(listener);
+ }
+
+ public void removeListener(ServiceInstancesChangedListener listener) {
+ listeners.remove(listener);
+ }
+
+ public int getListenerCount() {
+ return listeners.size();
+ }
+ }
+
+ private void recover() throws Exception {
+ // register
+ if (serviceInstance != null) {
+ if (logger.isInfoEnabled()) {
+ logger.info("Recover application register: " + serviceInstance);
+ }
+ doRegister(serviceInstance);
+ }
+ }
+
+ @Override
+ public URL getUrl() {
+ return registryURL;
+ }
+}
diff --git a/dubbo-registry-extensions/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdServiceDiscoveryFactory.java b/dubbo-registry-extensions/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdServiceDiscoveryFactory.java
new file mode 100644
index 0000000..8a0e0c2
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdServiceDiscoveryFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.etcd;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.registry.client.AbstractServiceDiscoveryFactory;
+import org.apache.dubbo.registry.client.ServiceDiscovery;
+
+public class EtcdServiceDiscoveryFactory extends AbstractServiceDiscoveryFactory {
+
+ @Override
+ protected ServiceDiscovery createDiscovery(URL registryURL) {
+ return new EtcdServiceDiscovery(applicationModel, registryURL);
+ }
+
+}
diff --git a/dubbo-registry-extensions/dubbo-registry-etcd3/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.RegistryFactory b/dubbo-registry-extensions/dubbo-registry-etcd3/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.RegistryFactory
new file mode 100644
index 0000000..4a6d09c
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-etcd3/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.RegistryFactory
@@ -0,0 +1 @@
+etcd3=org.apache.dubbo.registry.etcd.EtcdRegistryFactory
\ No newline at end of file
diff --git a/dubbo-registry-extensions/dubbo-registry-etcd3/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.ServiceDiscoveryFactory b/dubbo-registry-extensions/dubbo-registry-etcd3/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.ServiceDiscoveryFactory
new file mode 100644
index 0000000..2bd11aa
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-etcd3/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.ServiceDiscoveryFactory
@@ -0,0 +1 @@
+etcd=org.apache.dubbo.registry.etcd.EtcdServiceDiscoveryFactory
diff --git a/dubbo-registry-extensions/dubbo-registry-etcd3/src/test/java/org/apache/dubbo/registry/etcd/EtcdRegistryTest.java b/dubbo-registry-extensions/dubbo-registry-etcd3/src/test/java/org/apache/dubbo/registry/etcd/EtcdRegistryTest.java
new file mode 100644
index 0000000..82e32a5
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-etcd3/src/test/java/org/apache/dubbo/registry/etcd/EtcdRegistryTest.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.etcd;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.extension.ExtensionLoader;
+import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.registry.NotifyListener;
+import org.apache.dubbo.registry.RegistryFactory;
+import org.apache.dubbo.registry.support.RegistryManager;
+import org.apache.dubbo.remoting.Constants;
+import org.apache.dubbo.rpc.model.ApplicationModel;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.dubbo.common.constants.CommonConstants.ANY_VALUE;
+import static org.apache.dubbo.common.constants.CommonConstants.CLASSIFIER_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.ENABLED_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
+import static org.apache.dubbo.common.constants.RegistryConstants.CATEGORY_KEY;
+import static org.apache.dubbo.common.constants.RegistryConstants.CONFIGURATORS_CATEGORY;
+import static org.apache.dubbo.common.constants.RegistryConstants.CONSUMERS_CATEGORY;
+import static org.apache.dubbo.common.constants.RegistryConstants.PROVIDERS_CATEGORY;
+import static org.apache.dubbo.common.constants.RegistryConstants.ROUTERS_CATEGORY;
+import static org.apache.dubbo.registry.Constants.ADMIN_PROTOCOL;
+
+@Disabled
+public class EtcdRegistryTest {
+
+ String service = "org.apache.dubbo.internal.test.DemoServie";
+ String outerService = "org.apache.dubbo.outer.test.OuterDemoServie";
+ URL serviceUrl = URL.valueOf("dubbo://" + NetUtils.getLocalHost() + "/" + service + "?methods=test1,test2");
+ URL serviceUrl2 = URL.valueOf("dubbo://" + NetUtils.getLocalHost() + "/" + service + "?methods=test1,test2,test3");
+ URL serviceUrl3 = URL.valueOf("dubbo://" + NetUtils.getLocalHost() + "/" + outerService + "?methods=test1,test2");
+ URL registryUrl = URL.valueOf("etcd3://127.0.0.1:2379/org.apache.dubbo.registry.RegistryService");
+ URL consumerUrl = URL.valueOf("dubbo://" + NetUtils.getLocalHost() + ":2018" + "/" + service + "?methods=test1,test2");
+ RegistryFactory registryFactory = ExtensionLoader.getExtensionLoader(RegistryFactory.class).getExtension("etcd3", false);
+ EtcdRegistry registry;
+ URL subscribe = new URL(
+ ADMIN_PROTOCOL, NetUtils.getLocalHost(), 0, "",
+ INTERFACE_KEY, ANY_VALUE,
+ GROUP_KEY, ANY_VALUE,
+ VERSION_KEY, ANY_VALUE,
+ CLASSIFIER_KEY, ANY_VALUE,
+ CATEGORY_KEY, PROVIDERS_CATEGORY + "," + CONSUMERS_CATEGORY + "," + ROUTERS_CATEGORY + "," + CONFIGURATORS_CATEGORY,
+ ENABLED_KEY, ANY_VALUE,
+ Constants.CHECK_KEY, String.valueOf(false));
+
+ @Test
+ public void test_register() {
+
+ registry.register(serviceUrl);
+ Set<URL> registered = registry.getRegistered();
+ Assertions.assertEquals(1, registered.size());
+ Assertions.assertTrue(registered.contains(serviceUrl));
+
+ registry.unregister(serviceUrl);
+ }
+
+ @Test
+ public void test_unregister() {
+
+ registry.register(serviceUrl);
+ Set<URL> registered = registry.getRegistered();
+ Assertions.assertEquals(1, registered.size());
+ Assertions.assertTrue(registered.contains(serviceUrl));
+
+ registry.unregister(serviceUrl);
+
+ registered = registry.getRegistered();
+ Assertions.assertEquals(0, registered.size());
+ }
+
+ @Test
+ public void test_subscribe() {
+
+ registry.register(serviceUrl);
+
+ final AtomicReference<URL> notifiedUrl = new AtomicReference<URL>();
+ registry.subscribe(consumerUrl, new NotifyListener() {
+ public void notify(List<URL> urls) {
+ notifiedUrl.set(urls.get(0));
+ }
+ });
+ Assertions.assertEquals(serviceUrl.toFullString(), notifiedUrl.get().toFullString());
+ Map<URL, Set<NotifyListener>> arg = registry.getSubscribed();
+ Assertions.assertEquals(consumerUrl, arg.keySet().iterator().next());
+ }
+
+ @Test
+ public void test_subscribe_when_register() throws InterruptedException {
+
+ Assertions.assertEquals(0, registry.getRegistered().size());
+ Assertions.assertEquals(0, registry.getSubscribed().size());
+
+ CountDownLatch notNotified = new CountDownLatch(2);
+
+ final AtomicReference<URL> notifiedUrl = new AtomicReference<URL>();
+ registry.subscribe(consumerUrl, new NotifyListener() {
+ public void notify(List<URL> urls) {
+ notifiedUrl.set(urls.get(0));
+ notNotified.countDown();
+ }
+ });
+
+ registry.register(serviceUrl);
+
+ Assertions.assertTrue(notNotified.await(15, TimeUnit.SECONDS));
+
+ Assertions.assertEquals(serviceUrl.toFullString(), notifiedUrl.get().toFullString());
+ Map<URL, Set<NotifyListener>> subscribed = registry.getSubscribed();
+ Assertions.assertEquals(consumerUrl, subscribed.keySet().iterator().next());
+ }
+
+ @Test
+ public void test_subscribe_when_register0() throws InterruptedException {
+
+ Assertions.assertEquals(0, registry.getRegistered().size());
+ Assertions.assertEquals(0, registry.getSubscribed().size());
+
+ CountDownLatch notNotified = new CountDownLatch(3);
+ ConcurrentHashMap<URL, Boolean> notifiedUrls = new ConcurrentHashMap<>();
+ registry.subscribe(consumerUrl, new NotifyListener() {
+ public void notify(List<URL> urls) {
+ if (urls != null && urls.size() > 0) {
+ if (!urls.get(0).getProtocol().equals("empty")) {
+ for (Iterator<URL> iterator = urls.iterator(); iterator.hasNext(); ) {
+ notifiedUrls.put(iterator.next(), true);
+ }
+ }
+ }
+
+ notNotified.countDown();
+ }
+ });
+
+ registry.register(serviceUrl);
+ registry.register(serviceUrl2);
+
+ Assertions.assertTrue(notNotified.await(15, TimeUnit.SECONDS));
+
+ Assertions.assertTrue(notifiedUrls.containsKey(serviceUrl));
+ Assertions.assertTrue(notifiedUrls.containsKey(serviceUrl2));
+ Map<URL, Set<NotifyListener>> subscribed = registry.getSubscribed();
+ Assertions.assertEquals(consumerUrl, subscribed.keySet().iterator().next());
+ }
+
+ @Test
+ public void test_subscribe_when_register1() throws InterruptedException {
+
+ Assertions.assertEquals(0, registry.getRegistered().size());
+ Assertions.assertEquals(0, registry.getSubscribed().size());
+
+ CountDownLatch notNotified = new CountDownLatch(2);
+
+ final AtomicReference<URL> notifiedUrls = new AtomicReference<URL>();
+ registry.subscribe(consumerUrl, new NotifyListener() {
+ public void notify(List<URL> urls) {
+ notifiedUrls.set(urls.get(0));
+ notNotified.countDown();
+ }
+ });
+
+ registry.register(serviceUrl);
+ // register service3 should not trigger notify
+ registry.register(serviceUrl3);
+
+ Assertions.assertTrue(notNotified.await(15, TimeUnit.SECONDS));
+
+ Assertions.assertEquals(serviceUrl, notifiedUrls.get());
+ Map<URL, Set<NotifyListener>> subscribed = registry.getSubscribed();
+ Assertions.assertEquals(consumerUrl, subscribed.keySet().iterator().next());
+ }
+
+ @Test
+ public void test_subscribe_when_register2() throws InterruptedException {
+
+ Assertions.assertEquals(0, registry.getRegistered().size());
+ Assertions.assertEquals(0, registry.getSubscribed().size());
+
+ CountDownLatch notNotified = new CountDownLatch(3);
+
+ ConcurrentHashMap<URL, Boolean> notifiedUrls = new ConcurrentHashMap<>();
+
+ registry.subscribe(subscribe, new NotifyListener() {
+ public void notify(List<URL> urls) {
+ if (urls != null && urls.size() > 0) {
+ if (!urls.get(0).getProtocol().equals("empty")) {
+ for (Iterator<URL> iterator = urls.iterator(); iterator.hasNext(); ) {
+ notifiedUrls.put(iterator.next(), true);
+ }
+ notNotified.countDown();
+ }
+ }
+ }
+ });
+
+ registry.register(serviceUrl);
+ registry.register(serviceUrl2);
+ // service3 interface is not equals server2
+ registry.register(serviceUrl3);
+
+ Assertions.assertTrue(notNotified.await(15, TimeUnit.SECONDS));
+ Assertions.assertEquals(3, notifiedUrls.size());
+ Assertions.assertTrue(notifiedUrls.containsKey(serviceUrl));
+ Assertions.assertTrue(notifiedUrls.containsKey(serviceUrl2));
+ Assertions.assertTrue(notifiedUrls.containsKey(serviceUrl3));
+ }
+
+ @Test
+ public void test_unsubscribe() throws InterruptedException {
+
+ Assertions.assertEquals(0, registry.getRegistered().size());
+ Assertions.assertEquals(0, registry.getSubscribed().size());
+
+ CountDownLatch notNotified = new CountDownLatch(2);
+
+ final AtomicReference<URL> notifiedUrl = new AtomicReference<URL>();
+
+ NotifyListener listener = new NotifyListener() {
+ public void notify(List<URL> urls) {
+ if (urls != null) {
+ for (Iterator<URL> iterator = urls.iterator(); iterator.hasNext(); ) {
+ URL url = iterator.next();
+ if (!url.getProtocol().equals("empty")) {
+ notifiedUrl.set(url);
+ notNotified.countDown();
+ }
+ }
+ }
+ }
+ };
+ registry.subscribe(consumerUrl, listener);
+ registry.unsubscribe(consumerUrl, listener);
+
+ registry.register(serviceUrl);
+
+ Assertions.assertFalse(notNotified.await(2, TimeUnit.SECONDS));
+ // expect nothing happen
+ Assertions.assertNull(notifiedUrl.get());
+ }
+
+ @BeforeEach
+ public void setUp() {
+ registry = (EtcdRegistry) registryFactory.getRegistry(registryUrl);
+ Assertions.assertNotNull(registry);
+ if (!registry.isAvailable()) {
+ RegistryManager registryManager = ApplicationModel.defaultModel().getBeanFactory().getBean(RegistryManager.class);
+ registryManager.destroyAll();
+ registry = (EtcdRegistry) registryFactory.getRegistry(registryUrl);
+ }
+ }
+
+ @AfterEach
+ public void tearDown() {
+
+ registry.unregister(serviceUrl);
+ registry.unregister(serviceUrl2);
+ registry.unregister(serviceUrl3);
+ registry.unregister(subscribe);
+
+ registry.destroy();
+ }
+
+
+}
diff --git a/dubbo-registry-extensions/dubbo-registry-etcd3/src/test/java/org/apache/dubbo/registry/etcd/EtcdServiceDiscoveryTest.java b/dubbo-registry-extensions/dubbo-registry-etcd3/src/test/java/org/apache/dubbo/registry/etcd/EtcdServiceDiscoveryTest.java
new file mode 100644
index 0000000..50f66e6
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-etcd3/src/test/java/org/apache/dubbo/registry/etcd/EtcdServiceDiscoveryTest.java
@@ -0,0 +1,124 @@
+///*
+// * Licensed to the Apache Software Foundation (ASF) under one or more
+// * contributor license agreements. See the NOTICE file distributed with
+// * this work for additional information regarding copyright ownership.
+// * The ASF licenses this file to You under the Apache License, Version 2.0
+// * (the "License"); you may not use this file except in compliance with
+// * the License. You may obtain a copy of the License at
+// *
+// * http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// */
+//package org.apache.dubbo.registry.etcd;
+//
+//import org.apache.dubbo.common.URL;
+//import org.apache.dubbo.registry.client.DefaultServiceInstance;
+//import org.apache.dubbo.registry.client.ServiceInstance;
+//
+//import com.google.gson.Gson;
+//import org.junit.jupiter.api.AfterAll;
+//import org.junit.jupiter.api.Assertions;
+//import org.junit.jupiter.api.BeforeAll;
+//import org.junit.jupiter.api.Disabled;
+//import org.junit.jupiter.api.Test;
+//
+//import java.util.ArrayList;
+//import java.util.List;
+//
+//import static java.lang.String.valueOf;
+//
+///**
+// * 2019-08-30
+// * <p>
+// * There is no embedded server. so it works depend on etcd local server.
+// */
+//@Disabled
+//public class EtcdServiceDiscoveryTest {
+//
+// static EtcdServiceDiscovery etcdServiceDiscovery;
+//
+// @BeforeAll
+// public static void setUp() throws Exception {
+// URL url = URL.valueOf("etcd3://127.0.0.1:2379/org.apache.dubbo.registry.RegistryService");
+// etcdServiceDiscovery = new EtcdServiceDiscovery();
+// Assertions.assertNull(etcdServiceDiscovery.etcdClient);
+// etcdServiceDiscovery.initialize(url);
+// }
+//
+// @AfterAll
+// public static void destroy() throws Exception {
+//// etcdServiceDiscovery.destroy();
+// }
+//
+//
+// @Test
+// public void testLifecycle() throws Exception {
+// URL url = URL.valueOf("etcd3://127.0.0.1:2233/org.apache.dubbo.registry.RegistryService");
+// EtcdServiceDiscovery etcdServiceDiscoveryTmp = new EtcdServiceDiscovery();
+// Assertions.assertNull(etcdServiceDiscoveryTmp.etcdClient);
+// etcdServiceDiscoveryTmp.initialize(url);
+// Assertions.assertNotNull(etcdServiceDiscoveryTmp.etcdClient);
+// Assertions.assertTrue(etcdServiceDiscoveryTmp.etcdClient.isConnected());
+// etcdServiceDiscoveryTmp.destroy();
+// Assertions.assertFalse(etcdServiceDiscoveryTmp.etcdClient.isConnected());
+// }
+//
+// @Test
+// public void testRegistry() throws Exception {
+// ServiceInstance serviceInstance = new DefaultServiceInstance(valueOf(System.nanoTime()), "EtcdTestService", "127.0.0.1", 8080);
+// Assertions.assertNull(etcdServiceDiscovery.etcdClient.getKVValue(etcdServiceDiscovery.toPath(serviceInstance)));
+// etcdServiceDiscovery.register(serviceInstance);
+// Assertions.assertNotNull(etcdServiceDiscovery.etcdClient.getKVValue(etcdServiceDiscovery.toPath(serviceInstance)));
+// }
+//
+// @Test
+// public void testUnRegistry() throws Exception {
+// ServiceInstance serviceInstance = new DefaultServiceInstance(valueOf(System.nanoTime()), "EtcdTest2Service", "127.0.0.1", 8080);
+// Assertions.assertNull(etcdServiceDiscovery.etcdClient.getKVValue(etcdServiceDiscovery.toPath(serviceInstance)));
+// etcdServiceDiscovery.register(serviceInstance);
+// Assertions.assertNotNull(etcdServiceDiscovery.etcdClient.getKVValue(etcdServiceDiscovery.toPath(serviceInstance)));
+// etcdServiceDiscovery.unregister(serviceInstance);
+// Assertions.assertNull(etcdServiceDiscovery.etcdClient.getKVValue(etcdServiceDiscovery.toPath(serviceInstance)));
+// }
+//
+// @Test
+// public void testUpdate() throws Exception {
+// DefaultServiceInstance serviceInstance = new DefaultServiceInstance(valueOf(System.nanoTime()), "EtcdTest34Service", "127.0.0.1", 8080);
+// Assertions.assertNull(etcdServiceDiscovery.etcdClient.getKVValue(etcdServiceDiscovery.toPath(serviceInstance)));
+// etcdServiceDiscovery.register(serviceInstance);
+// Assertions.assertNotNull(etcdServiceDiscovery.etcdClient.getKVValue(etcdServiceDiscovery.toPath(serviceInstance)));
+// Assertions.assertEquals(etcdServiceDiscovery.etcdClient.getKVValue(etcdServiceDiscovery.toPath(serviceInstance)),
+// new Gson().toJson(serviceInstance));
+// serviceInstance.setPort(9999);
+// etcdServiceDiscovery.update(serviceInstance);
+// Assertions.assertNotNull(etcdServiceDiscovery.etcdClient.getKVValue(etcdServiceDiscovery.toPath(serviceInstance)));
+// Assertions.assertEquals(etcdServiceDiscovery.etcdClient.getKVValue(etcdServiceDiscovery.toPath(serviceInstance)),
+// new Gson().toJson(serviceInstance));
+// }
+//
+// @Test
+// public void testGetInstances() throws Exception {
+// String serviceName = "EtcdTest77Service";
+// Assertions.assertTrue(etcdServiceDiscovery.getInstances(serviceName).isEmpty());
+// etcdServiceDiscovery.register(new DefaultServiceInstance(valueOf(System.nanoTime()), serviceName, "127.0.0.1", 8080));
+// etcdServiceDiscovery.register(new DefaultServiceInstance(valueOf(System.nanoTime()), serviceName, "127.0.0.1", 9809));
+// Assertions.assertFalse(etcdServiceDiscovery.getInstances(serviceName).isEmpty());
+// List<String> r = convertToIpPort(etcdServiceDiscovery.getInstances(serviceName));
+// Assertions.assertTrue(r.contains("127.0.0.1:8080"));
+// Assertions.assertTrue(r.contains("127.0.0.1:9809"));
+// }
+//
+// private List<String> convertToIpPort(List<ServiceInstance> serviceInstances) {
+// List<String> result = new ArrayList<>();
+// for (ServiceInstance serviceInstance : serviceInstances) {
+// result.add(serviceInstance.getHost() + ":" + serviceInstance.getPort());
+// }
+// return result;
+// }
+//
+//}
diff --git a/dubbo-registry-extensions/dubbo-registry-redis/pom.xml b/dubbo-registry-extensions/dubbo-registry-redis/pom.xml
new file mode 100644
index 0000000..665f6aa
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-redis/pom.xml
@@ -0,0 +1,59 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <parent>
+ <artifactId>dubbo-registry-extensions</artifactId>
+ <groupId>org.apache.dubbo.extensions</groupId>
+ <version>${revision}</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <version>1.0.0-SNAPSHOT</version>
+ <artifactId>dubbo-registry-redis</artifactId>
+ <packaging>jar</packaging>
+ <name>${project.artifactId}</name>
+ <description>The redis registry module of dubbo project</description>
+ <properties>
+ <skip_maven_deploy>false</skip_maven_deploy>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo-registry-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.dubbo.extensions</groupId>
+ <artifactId>dubbo-remoting-redis</artifactId>
+ <version>1.0.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>redis.clients</groupId>
+ <artifactId>jedis</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.github.codemonstur</groupId>
+ <artifactId>embedded-redis</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/dubbo-registry-extensions/dubbo-registry-redis/src/main/java/org/apache/dubbo/registry/redis/RedisRegistry.java b/dubbo-registry-extensions/dubbo-registry-redis/src/main/java/org/apache/dubbo/registry/redis/RedisRegistry.java
new file mode 100644
index 0000000..a65d6a7
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-redis/src/main/java/org/apache/dubbo/registry/redis/RedisRegistry.java
@@ -0,0 +1,519 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.redis;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.URLBuilder;
+import org.apache.dubbo.common.utils.CollectionUtils;
+import org.apache.dubbo.common.utils.ExecutorUtil;
+import org.apache.dubbo.common.utils.NamedThreadFactory;
+import org.apache.dubbo.common.utils.UrlUtils;
+import org.apache.dubbo.registry.NotifyListener;
+import org.apache.dubbo.registry.support.FailbackRegistry;
+import org.apache.dubbo.remoting.redis.RedisClient;
+import org.apache.dubbo.remoting.redis.jedis.ClusterRedisClient;
+import org.apache.dubbo.remoting.redis.jedis.MonoRedisClient;
+import org.apache.dubbo.remoting.redis.jedis.SentinelRedisClient;
+import org.apache.dubbo.rpc.RpcException;
+
+import redis.clients.jedis.JedisPubSub;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.dubbo.common.constants.CommonConstants.ANYHOST_VALUE;
+import static org.apache.dubbo.common.constants.CommonConstants.ANY_VALUE;
+import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.PATH_SEPARATOR;
+import static org.apache.dubbo.common.constants.RegistryConstants.CATEGORY_KEY;
+import static org.apache.dubbo.common.constants.RegistryConstants.DEFAULT_CATEGORY;
+import static org.apache.dubbo.common.constants.RegistryConstants.DYNAMIC_KEY;
+import static org.apache.dubbo.common.constants.RegistryConstants.EMPTY_PROTOCOL;
+import static org.apache.dubbo.registry.Constants.DEFAULT_REGISTRY_RECONNECT_PERIOD;
+import static org.apache.dubbo.registry.Constants.DEFAULT_SESSION_TIMEOUT;
+import static org.apache.dubbo.registry.Constants.REGISTER;
+import static org.apache.dubbo.registry.Constants.REGISTRY_RECONNECT_PERIOD_KEY;
+import static org.apache.dubbo.registry.Constants.SESSION_TIMEOUT_KEY;
+import static org.apache.dubbo.registry.Constants.UNREGISTER;
+
+/**
+ * RedisRegistry
+ */
+public class RedisRegistry extends FailbackRegistry {
+
+ private static final String REDIS_CLIENT_KEY = "redis-client";
+
+ private static final String MONO_REDIS = "mono";
+
+ private static final String SENTINEL_REDIS = "sentinel";
+
+ private static final String CLUSTER_REDIS = "cluster";
+ private static final String DEFAULT_ROOT = "dubbo";
+
+ private final ScheduledExecutorService expireExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("DubboRegistryExpireTimer", true));
+
+ private final ScheduledFuture<?> expireFuture;
+
+ private final String root;
+
+ private RedisClient redisClient;
+
+ private final ConcurrentMap<String, Notifier> notifiers = new ConcurrentHashMap<>();
+
+ private final int reconnectPeriod;
+
+ private final int expirePeriod;
+
+ private volatile boolean admin = false;
+
+ private final Map<URL, Long> expireCache = new ConcurrentHashMap<>();
+
+ // just for unit test
+ private volatile boolean doExpire = true;
+
+ public RedisRegistry(URL url) {
+ super(url);
+ String type = url.getParameter(REDIS_CLIENT_KEY, MONO_REDIS);
+ if (SENTINEL_REDIS.equals(type)) {
+ redisClient = new SentinelRedisClient(url);
+ } else if (CLUSTER_REDIS.equals(type)) {
+ redisClient = new ClusterRedisClient(url);
+ } else {
+ redisClient = new MonoRedisClient(url);
+ }
+
+ if (url.isAnyHost()) {
+ throw new IllegalStateException("registry address == null");
+ }
+
+ this.reconnectPeriod = url.getParameter(REGISTRY_RECONNECT_PERIOD_KEY, DEFAULT_REGISTRY_RECONNECT_PERIOD);
+ String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT);
+ if (!group.startsWith(PATH_SEPARATOR)) {
+ group = PATH_SEPARATOR + group;
+ }
+ if (!group.endsWith(PATH_SEPARATOR)) {
+ group = group + PATH_SEPARATOR;
+ }
+ this.root = group;
+
+ this.expirePeriod = url.getParameter(SESSION_TIMEOUT_KEY, DEFAULT_SESSION_TIMEOUT);
+ this.expireFuture = expireExecutor.scheduleWithFixedDelay(() -> {
+ try {
+ deferExpired(); // Extend the expiration time
+ } catch (Throwable t) { // Defensive fault tolerance
+ logger.error("Unexpected exception occur at defer expire time, cause: " + t.getMessage(), t);
+ }
+ }, expirePeriod / 2, expirePeriod / 2, TimeUnit.MILLISECONDS);
+ }
+
+ private void deferExpired() {
+ for (URL url : new HashSet<>(getRegistered())) {
+ if (url.getParameter(DYNAMIC_KEY, true)) {
+ String key = toCategoryPath(url);
+ if (redisClient.hset(key, url.toFullString(), String.valueOf(System.currentTimeMillis() + expirePeriod)) == 1) {
+ redisClient.publish(key, REGISTER);
+ }
+ }
+ }
+
+ if (doExpire) {
+ for (Map.Entry<URL, Long> expireEntry : expireCache.entrySet()) {
+ if (expireEntry.getValue() < System.currentTimeMillis()) {
+ doNotify(toCategoryPath(expireEntry.getKey()));
+ }
+ }
+ }
+
+ if (admin) {
+ clean();
+ }
+ }
+
+ private void clean() {
+ Set<String> keys = redisClient.scan(root + ANY_VALUE);
+ if (CollectionUtils.isNotEmpty(keys)) {
+ for (String key : keys) {
+ Map<String, String> values = redisClient.hgetAll(key);
+ if (CollectionUtils.isNotEmptyMap(values)) {
+ boolean delete = false;
+ long now = System.currentTimeMillis();
+ for (Map.Entry<String, String> entry : values.entrySet()) {
+ URL url = URL.valueOf(entry.getKey());
+ if (url.getParameter(DYNAMIC_KEY, true)) {
+ long expire = Long.parseLong(entry.getValue());
+ if (expire < now) {
+ redisClient.hdel(key, entry.getKey());
+ delete = true;
+ if (logger.isWarnEnabled()) {
+ logger.warn("Delete expired key: " + key + " -> value: " + entry.getKey() + ", expire: " + new Date(expire) + ", now: " + new Date(now));
+ }
+ }
+ }
+ }
+ if (delete) {
+ redisClient.publish(key, UNREGISTER);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public boolean isAvailable() {
+ return redisClient.isConnected();
+ }
+
+ @Override
+ public void destroy() {
+ super.destroy();
+ try {
+ expireFuture.cancel(true);
+ } catch (Throwable t) {
+ logger.warn(t.getMessage(), t);
+ }
+ try {
+ for (Notifier notifier : notifiers.values()) {
+ notifier.shutdown();
+ }
+ } catch (Throwable t) {
+ logger.warn(t.getMessage(), t);
+ }
+ try {
+ redisClient.destroy();
+ } catch (Throwable t) {
+ logger.warn("Failed to destroy the redis registry client. registry: " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
+ }
+ ExecutorUtil.gracefulShutdown(expireExecutor, expirePeriod);
+ }
+
+ @Override
+ public void doRegister(URL url) {
+ String key = toCategoryPath(url);
+ String value = url.toFullString();
+ String expire = String.valueOf(System.currentTimeMillis() + expirePeriod);
+ try {
+ redisClient.hset(key, value, expire);
+ redisClient.publish(key, REGISTER);
+ } catch (Throwable t) {
+ throw new RpcException("Failed to register service to redis registry. registry: " + url.getAddress() + ", service: " + url + ", cause: " + t.getMessage(), t);
+ }
+ }
+
+ @Override
+ public void doUnregister(URL url) {
+ String key = toCategoryPath(url);
+ String value = url.toFullString();
+ try {
+ redisClient.hdel(key, value);
+ redisClient.publish(key, UNREGISTER);
+ } catch (Throwable t) {
+ throw new RpcException("Failed to unregister service to redis registry. registry: " + url.getAddress() + ", service: " + url + ", cause: " + t.getMessage(), t);
+ }
+ }
+
+ @Override
+ public void doSubscribe(final URL url, final NotifyListener listener) {
+ String service = toServicePath(url);
+ Notifier notifier = notifiers.get(service);
+ if (notifier == null) {
+ Notifier newNotifier = new Notifier(service);
+ notifiers.putIfAbsent(service, newNotifier);
+ notifier = notifiers.get(service);
+ if (notifier == newNotifier) {
+ notifier.start();
+ }
+ }
+ try {
+ if (service.endsWith(ANY_VALUE)) {
+ admin = true;
+ Set<String> keys = redisClient.scan(service);
+ if (CollectionUtils.isNotEmpty(keys)) {
+ Map<String, Set<String>> serviceKeys = new HashMap<>();
+ for (String key : keys) {
+ String serviceKey = toServicePath(key);
+ Set<String> sk = serviceKeys.computeIfAbsent(serviceKey, k -> new HashSet<>());
+ sk.add(key);
+ }
+ for (Set<String> sk : serviceKeys.values()) {
+ doNotify(sk, url, Collections.singletonList(listener));
+ }
+ }
+ } else {
+ doNotify(redisClient.scan(service + PATH_SEPARATOR + ANY_VALUE), url, Collections.singletonList(listener));
+ }
+ } catch (Throwable t) {
+ throw new RpcException("Failed to subscribe service from redis registry. registry: " + url.getAddress() + ", service: " + url + ", cause: " + t.getMessage(), t);
+ }
+ }
+
+ @Override
+ public void doUnsubscribe(URL url, NotifyListener listener) {
+ }
+
+ private void doNotify(String key) {
+ for (Map.Entry<URL, Set<NotifyListener>> entry : new HashMap<>(getSubscribed()).entrySet()) {
+ doNotify(Collections.singletonList(key), entry.getKey(), new HashSet<>(entry.getValue()));
+ }
+ }
+
+ private void doNotify(Collection<String> keys, URL url, Collection<NotifyListener> listeners) {
+ if (keys == null || keys.isEmpty()
+ || listeners == null || listeners.isEmpty()) {
+ return;
+ }
+ long now = System.currentTimeMillis();
+ List<URL> result = new ArrayList<>();
+ List<String> categories = Arrays.asList(url.getParameter(CATEGORY_KEY, new String[0]));
+ String consumerService = url.getServiceInterface();
+ for (String key : keys) {
+ if (!ANY_VALUE.equals(consumerService)) {
+ String providerService = toServiceName(key);
+ if (!providerService.equals(consumerService)) {
+ continue;
+ }
+ }
+ String category = toCategoryName(key);
+ if (!categories.contains(ANY_VALUE) && !categories.contains(category)) {
+ continue;
+ }
+ List<URL> urls = new ArrayList<>();
+ Set<URL> toDeleteExpireKeys = new HashSet<>(expireCache.keySet());
+ Map<String, String> values = redisClient.hgetAll(key);
+ if (CollectionUtils.isNotEmptyMap(values)) {
+ for (Map.Entry<String, String> entry : values.entrySet()) {
+ URL u = URL.valueOf(entry.getKey());
+ long expire = Long.parseLong(entry.getValue());
+ if (!u.getParameter(DYNAMIC_KEY, true)
+ || expire >= now) {
+ if (UrlUtils.isMatch(url, u)) {
+ urls.add(u);
+ expireCache.put(u, expire);
+ toDeleteExpireKeys.remove(u);
+ }
+ }
+ }
+ }
+
+ if (!toDeleteExpireKeys.isEmpty()) {
+ for (URL u : toDeleteExpireKeys) {
+ expireCache.remove(u);
+ }
+ }
+ if (urls.isEmpty()) {
+ urls.add(URLBuilder.from(url)
+ .setProtocol(EMPTY_PROTOCOL)
+ .setAddress(ANYHOST_VALUE)
+ .setPath(toServiceName(key))
+ .addParameter(CATEGORY_KEY, category)
+ .build());
+ }
+ result.addAll(urls);
+
+ if (logger.isInfoEnabled()) {
+ logger.info("redis notify: " + key + " = " + urls);
+ }
+ }
+ if (CollectionUtils.isEmpty(result)) {
+ return;
+ }
+ for (NotifyListener listener : listeners) {
+ notify(url, listener, result);
+ }
+ }
+
+ private String toServiceName(String categoryPath) {
+ String servicePath = toServicePath(categoryPath);
+ return servicePath.startsWith(root) ? servicePath.substring(root.length()) : servicePath;
+ }
+
+ private String toCategoryName(String categoryPath) {
+ int i = categoryPath.lastIndexOf(PATH_SEPARATOR);
+ return i > 0 ? categoryPath.substring(i + 1) : categoryPath;
+ }
+
+ private String toServicePath(String categoryPath) {
+ int i;
+ if (categoryPath.startsWith(root)) {
+ i = categoryPath.indexOf(PATH_SEPARATOR, root.length());
+ } else {
+ i = categoryPath.indexOf(PATH_SEPARATOR);
+ }
+ return i > 0 ? categoryPath.substring(0, i) : categoryPath;
+ }
+
+ private String toServicePath(URL url) {
+ return root + url.getServiceInterface();
+ }
+
+ private String toCategoryPath(URL url) {
+ return toServicePath(url) + PATH_SEPARATOR + url.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY);
+ }
+
+ private class NotifySub extends JedisPubSub {
+ public NotifySub() {}
+
+ @Override
+ public void onMessage(String key, String msg) {
+ if (logger.isInfoEnabled()) {
+ logger.info("redis event: " + key + " = " + msg);
+ }
+ if (msg.equals(REGISTER)
+ || msg.equals(UNREGISTER)) {
+ try {
+ doNotify(key);
+ } catch (Throwable t) { // TODO Notification failure does not restore mechanism guarantee
+ logger.error(t.getMessage(), t);
+ }
+ }
+ }
+
+ @Override
+ public void onPMessage(String pattern, String key, String msg) {
+ onMessage(key, msg);
+ }
+
+ @Override
+ public void onSubscribe(String key, int num) {
+ }
+
+ @Override
+ public void onPSubscribe(String pattern, int num) {
+ }
+
+ @Override
+ public void onUnsubscribe(String key, int num) {
+ }
+
+ @Override
+ public void onPUnsubscribe(String pattern, int num) {
+ }
+
+ }
+
+ private class Notifier extends Thread {
+
+ private final String service;
+ private final AtomicInteger connectSkip = new AtomicInteger();
+ private final AtomicInteger connectSkipped = new AtomicInteger();
+
+ private volatile boolean first = true;
+ private volatile boolean running = true;
+ private volatile int connectRandom;
+
+ public Notifier(String service) {
+ super.setDaemon(true);
+ super.setName("DubboRedisSubscribe");
+ this.service = service;
+ }
+
+ private void resetSkip() {
+ connectSkip.set(0);
+ connectSkipped.set(0);
+ connectRandom = 0;
+ }
+
+ private boolean isSkip() {
+ int skip = connectSkip.get(); // Growth of skipping times
+ if (skip >= 10) { // If the number of skipping times increases by more than 10, take the random number
+ if (connectRandom == 0) {
+ connectRandom = ThreadLocalRandom.current().nextInt(10);
+ }
+ skip = 10 + connectRandom;
+ }
+ if (connectSkipped.getAndIncrement() < skip) { // Check the number of skipping times
+ return true;
+ }
+ connectSkip.incrementAndGet();
+ connectSkipped.set(0);
+ connectRandom = 0;
+ return false;
+ }
+
+ @Override
+ public void run() {
+ while (running) {
+ try {
+ if (!isSkip()) {
+ try {
+ if (!redisClient.isConnected()) {
+ continue;
+ }
+ try {
+ if (service.endsWith(ANY_VALUE)) {
+ if (first) {
+ first = false;
+ Set<String> keys = redisClient.scan(service);
+ if (CollectionUtils.isNotEmpty(keys)) {
+ for (String s : keys) {
+ doNotify(s);
+ }
+ }
+ resetSkip();
+ }
+ redisClient.psubscribe(new NotifySub(), service);
+ } else {
+ if (first) {
+ first = false;
+ doNotify(service);
+ resetSkip();
+ }
+ redisClient.psubscribe(new NotifySub(), service + PATH_SEPARATOR + ANY_VALUE); // blocking
+ }
+ } catch (Throwable t) { // Retry another server
+ logger.warn("Failed to subscribe service from redis registry. registry: " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
+ // If you only have a single redis, you need to take a rest to avoid overtaking a lot of CPU resources
+ sleep(reconnectPeriod);
+ }
+ } catch (Throwable t) {
+ logger.error(t.getMessage(), t);
+ sleep(reconnectPeriod);
+ }
+ }
+ } catch (Throwable t) {
+ logger.error(t.getMessage(), t);
+ }
+ }
+ }
+
+ public void shutdown() {
+ try {
+ running = false;
+ redisClient.disconnect();
+ } catch (Throwable t) {
+ logger.warn(t.getMessage(), t);
+ }
+ }
+
+ }
+
+}
diff --git a/dubbo-registry-extensions/dubbo-registry-redis/src/main/java/org/apache/dubbo/registry/redis/RedisRegistryFactory.java b/dubbo-registry-extensions/dubbo-registry-redis/src/main/java/org/apache/dubbo/registry/redis/RedisRegistryFactory.java
new file mode 100644
index 0000000..ec1b5ea
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-redis/src/main/java/org/apache/dubbo/registry/redis/RedisRegistryFactory.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.redis;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.registry.Registry;
+import org.apache.dubbo.registry.support.AbstractRegistryFactory;
+
+/**
+ * RedisRegistryFactory
+ *
+ */
+public class RedisRegistryFactory extends AbstractRegistryFactory {
+
+ @Override
+ protected Registry createRegistry(URL url) {
+ return new RedisRegistry(url);
+ }
+
+}
diff --git a/dubbo-registry-extensions/dubbo-registry-redis/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.RegistryFactory b/dubbo-registry-extensions/dubbo-registry-redis/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.RegistryFactory
new file mode 100644
index 0000000..3214101
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-redis/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.RegistryFactory
@@ -0,0 +1 @@
+redis=org.apache.dubbo.registry.redis.RedisRegistryFactory
\ No newline at end of file
diff --git a/dubbo-registry-extensions/dubbo-registry-redis/src/test/java/org/apache/dubbo/registry/redis/RedisRegistryTest.java b/dubbo-registry-extensions/dubbo-registry-redis/src/test/java/org/apache/dubbo/registry/redis/RedisRegistryTest.java
new file mode 100644
index 0000000..303971d
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-redis/src/test/java/org/apache/dubbo/registry/redis/RedisRegistryTest.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.redis;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.registry.NotifyListener;
+import org.apache.dubbo.registry.Registry;
+import org.apache.dubbo.registry.support.AbstractRegistry;
+
+import org.apache.commons.lang3.SystemUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import redis.clients.jedis.exceptions.JedisConnectionException;
+import redis.embedded.RedisServer;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import static org.apache.dubbo.common.constants.RemotingConstants.BACKUP_KEY;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static redis.embedded.RedisServer.newRedisServer;
+
+public class RedisRegistryTest {
+
+ private static final String SERVICE = "org.apache.dubbo.test.injvmServie";
+ private static final URL SERVICE_URL = URL.valueOf("redis://redis/" + SERVICE + "?notify=false&methods=test1,test2&category=providers,configurators,routers");
+ private static final URL PROVIDER_URL_A = URL.valueOf("redis://127.0.0.1:20880/" + SERVICE + "?notify=false&methods=test1,test2");
+ private static final URL PROVIDER_URL_B = URL.valueOf("redis://127.0.0.1:20881/" + SERVICE + "?notify=false&methods=test1,test2");
+
+ private RedisServer redisServer;
+ private RedisRegistry redisRegistry;
+ private URL registryUrl;
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ int redisPort = 0;
+ IOException exception = null;
+
+ for (int i = 0; i < 10; i++) {
+ try {
+ redisPort = NetUtils.getAvailablePort(30000 + new Random().nextInt(10000));
+ redisServer = newRedisServer()
+ .port(redisPort)
+ // set maxheap to fix Windows error 0x70 while starting redis
+ .settingIf(SystemUtils.IS_OS_WINDOWS, "maxheap 128mb")
+ .build();
+ this.redisServer.start();
+ exception = null;
+ } catch (IOException e) {
+ e.printStackTrace();
+ exception = e;
+ }
+ if (exception == null) {
+ break;
+ }
+ }
+ Assertions.assertNull(exception);
+ registryUrl = URL.valueOf("redis://localhost:" + redisPort + "?session=4000");
+ redisRegistry = (RedisRegistry) new RedisRegistryFactory().createRegistry(registryUrl);
+ }
+
+ @AfterEach
+ public void tearDown() throws Exception {
+ this.redisServer.stop();
+ }
+
+ @Test
+ public void testRegister() {
+ Set<URL> registered = null;
+
+ for (int i = 0; i < 2; i++) {
+ redisRegistry.register(SERVICE_URL);
+ registered = redisRegistry.getRegistered();
+ assertThat(registered.contains(SERVICE_URL), is(true));
+ }
+
+ registered = redisRegistry.getRegistered();
+ assertThat(registered.size(), is(1));
+ }
+
+ @Test
+ public void testAnyHost() {
+ assertThrows(IllegalStateException.class, () -> {
+ URL errorUrl = URL.valueOf("multicast://0.0.0.0/");
+ new RedisRegistryFactory().createRegistry(errorUrl);
+ });
+ }
+
+ @Test
+ public void testSubscribeExpireCache() throws Exception {
+ redisRegistry.register(PROVIDER_URL_A);
+ redisRegistry.register(PROVIDER_URL_B);
+
+ NotifyListener listener = new NotifyListener() {
+ @Override
+ public void notify(List<URL> urls) {
+ }
+ };
+
+ redisRegistry.subscribe(SERVICE_URL, listener);
+
+ Field expireCache = RedisRegistry.class.getDeclaredField("expireCache");
+ expireCache.setAccessible(true);
+ Map<URL, Long> cacheExpire = (Map<URL, Long>)expireCache.get(redisRegistry);
+
+ assertThat(cacheExpire.get(PROVIDER_URL_A) > 0, is(true));
+ assertThat(cacheExpire.get(PROVIDER_URL_B) > 0, is(true));
+
+ redisRegistry.unregister(PROVIDER_URL_A);
+
+ boolean success = false;
+
+ for (int i = 0; i < 30; i++) {
+ cacheExpire = (Map<URL, Long>)expireCache.get(redisRegistry);
+ if (cacheExpire.get(PROVIDER_URL_A) == null) {
+ success = true;
+ break;
+ }
+ Thread.sleep(500);
+ }
+ assertThat(success, is(true));
+ }
+
+ @Test
+ public void testSubscribeWhenProviderCrash() throws Exception {
+
+ // unit test will fail if doExpire=false
+ // Field doExpireField = RedisRegistry.class.getDeclaredField("doExpire");
+ // doExpireField.setAccessible(true);
+ // doExpireField.set(redisRegistry, false);
+
+ redisRegistry.register(PROVIDER_URL_A);
+ redisRegistry.register(PROVIDER_URL_B);
+ assertThat(redisRegistry.getRegistered().contains(PROVIDER_URL_A), is(true));
+ assertThat(redisRegistry.getRegistered().contains(PROVIDER_URL_B), is(true));
+
+ Set<URL> notifiedUrls = new HashSet<>();
+ Object lock = new Object();
+
+ NotifyListener listener = new NotifyListener() {
+ @Override
+ public void notify(List<URL> urls) {
+ synchronized (lock) {
+ notifiedUrls.clear();
+ notifiedUrls.addAll(urls);
+ }
+ }
+ };
+
+ redisRegistry.subscribe(SERVICE_URL, listener);
+ assertThat(redisRegistry.getSubscribed().size(), is(1));
+
+ boolean firstOk = false;
+ boolean secondOk = false;
+
+ for (int i = 0; i < 30; i++) {
+ synchronized (lock) {
+ if (notifiedUrls.contains(PROVIDER_URL_A) && notifiedUrls.contains(PROVIDER_URL_B)) {
+ firstOk = true;
+ break;
+ }
+ }
+ Thread.sleep(500);
+ }
+
+ assertThat(firstOk, is(true));
+
+ // kill -9 to providerB
+ Field registeredField = AbstractRegistry.class.getDeclaredField("registered");
+ registeredField.setAccessible(true);
+ ((Set<URL>) registeredField.get(redisRegistry)).remove(PROVIDER_URL_B);
+
+ for (int i = 0; i < 30; i++) {
+ synchronized (lock) {
+ if (notifiedUrls.contains(PROVIDER_URL_A) && notifiedUrls.size() == 1) {
+ secondOk = true;
+ break;
+ }
+ }
+ Thread.sleep(500);
+ }
+ assertThat(secondOk, is(true));
+ }
+
+ @Test
+ public void testSubscribeAndUnsubscribe() {
+ NotifyListener listener = new NotifyListener() {
+ @Override
+ public void notify(List<URL> urls) {
+
+ }
+ };
+ redisRegistry.subscribe(SERVICE_URL, listener);
+
+ Map<URL, Set<NotifyListener>> subscribed = redisRegistry.getSubscribed();
+ assertThat(subscribed.size(), is(1));
+ assertThat(subscribed.get(SERVICE_URL).size(), is(1));
+
+ redisRegistry.unsubscribe(SERVICE_URL, listener);
+ subscribed = redisRegistry.getSubscribed();
+ assertThat(subscribed.get(SERVICE_URL).size(), is(0));
+ }
+
+ @Test
+ public void testAvailable() {
+ redisRegistry.register(SERVICE_URL);
+ assertThat(redisRegistry.isAvailable(), is(true));
+
+ redisRegistry.destroy();
+ assertThrows(JedisConnectionException.class, () -> redisRegistry.isAvailable());
+ }
+
+ @Test
+ public void testAvailableWithBackup() {
+ URL url = URL.valueOf("redis://redisOne:8880").addParameter(BACKUP_KEY, "redisTwo:8881");
+ Registry registry = new RedisRegistryFactory().createRegistry(url);
+
+ Registry finalRegistry = registry;
+ assertThrows(JedisConnectionException.class, () -> finalRegistry.isAvailable());
+
+ url = URL.valueOf(this.registryUrl.toFullString()).addParameter(BACKUP_KEY, "redisTwo:8881");
+ registry = new RedisRegistryFactory().createRegistry(url);
+
+ assertThat(registry.isAvailable(), is(true));
+ }
+}
diff --git a/dubbo-registry-extensions/dubbo-registry-sofa/pom.xml b/dubbo-registry-extensions/dubbo-registry-sofa/pom.xml
new file mode 100644
index 0000000..4a4ec00
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-sofa/pom.xml
@@ -0,0 +1,130 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>dubbo-registry-extensions</artifactId>
+ <groupId>org.apache.dubbo.extensions</groupId>
+ <version>${revision}</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <version>1.0.0-SNAPSHOT</version>
+ <artifactId>dubbo-registry-sofa</artifactId>
+ <name>${project.artifactId}</name>
+ <description>The SOFARegistry module of Dubbo project</description>
+
+ <properties>
+ <javax.ws.rs.version>2.1</javax.ws.rs.version>
+ <argline>-Dnetwork_interface_denylist=docker0</argline>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo-registry-api</artifactId>
+ <optional>true</optional>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo-common</artifactId>
+ <optional>true</optional>
+ </dependency>
+
+ <dependency>
+ <groupId>com.alipay.sofa</groupId>
+ <artifactId>registry-client-all</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>com.alipay.sofa</groupId>
+ <artifactId>sofa-common-tools</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo-rpc-dubbo</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo-remoting-netty4</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo-serialization-hessian2</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <!-- test modules -->
+ <dependency>
+ <groupId>com.alipay.sofa</groupId>
+ <artifactId>registry-test</artifactId>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>log4j-over-slf4j</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>log4j-jcl</artifactId>
+ <groupId>org.apache.logging.log4j</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>log4j-core</artifactId>
+ <groupId>org.apache.logging.log4j</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>log4j-api</artifactId>
+ <groupId>org.apache.logging.log4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.jboss.resteasy</groupId>
+ <artifactId>resteasy-jaxrs</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.resteasy</groupId>
+ <artifactId>resteasy-netty4</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>javax.ws.rs</groupId>
+ <artifactId>javax.ws.rs-api</artifactId>
+ <version>${javax.ws.rs.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>javax.xml.bind</groupId>
+ <artifactId>jaxb-api</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+</project>
diff --git a/dubbo-registry-extensions/dubbo-registry-sofa/src/main/java/org/apache/dubbo/registry/sofa/SofaRegistry.java b/dubbo-registry-extensions/dubbo-registry-sofa/src/main/java/org/apache/dubbo/registry/sofa/SofaRegistry.java
new file mode 100644
index 0000000..60debfa
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-sofa/src/main/java/org/apache/dubbo/registry/sofa/SofaRegistry.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.sofa;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.utils.StringUtils;
+import org.apache.dubbo.registry.NotifyListener;
+import org.apache.dubbo.registry.support.FailbackRegistry;
+
+import com.alipay.sofa.registry.client.api.RegistryClient;
+import com.alipay.sofa.registry.client.api.RegistryClientConfig;
+import com.alipay.sofa.registry.client.api.Subscriber;
+import com.alipay.sofa.registry.client.api.model.RegistryType;
+import com.alipay.sofa.registry.client.api.model.UserData;
+import com.alipay.sofa.registry.client.api.registration.PublisherRegistration;
+import com.alipay.sofa.registry.client.api.registration.SubscriberRegistration;
+import com.alipay.sofa.registry.client.provider.DefaultRegistryClient;
+import com.alipay.sofa.registry.client.provider.DefaultRegistryClientConfigBuilder;
+import com.alipay.sofa.registry.core.model.ScopeEnum;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.dubbo.common.constants.CommonConstants.APPLICATION_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.DUBBO;
+import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
+import static org.apache.dubbo.registry.Constants.CONSUMER_PROTOCOL;
+import static org.apache.dubbo.registry.Constants.PROVIDER_PROTOCOL;
+import static org.apache.dubbo.registry.Constants.REGISTER_KEY;
+import static org.apache.dubbo.registry.Constants.SUBSCRIBE_KEY;
+import static org.apache.dubbo.registry.sofa.SofaRegistryConstants.ADDRESS_WAIT_TIME_KEY;
+import static org.apache.dubbo.registry.sofa.SofaRegistryConstants.DEFAULT_GROUP;
+import static org.apache.dubbo.registry.sofa.SofaRegistryConstants.LOCAL_DATA_CENTER;
+import static org.apache.dubbo.registry.sofa.SofaRegistryConstants.LOCAL_REGION;
+
+/**
+ * The Sofa registry.
+ *
+ * @since 2.7.2
+ */
+public class SofaRegistry extends FailbackRegistry {
+
+ /**
+ * Cache subscriber by dataId
+ */
+ private final Map<String, Subscriber> subscribers = new ConcurrentHashMap<>();
+
+ /**
+ * Direct registry client
+ */
+ private RegistryClient registryClient;
+ /**
+ * wait address from registry
+ */
+ private int waitAddressTimeout;
+
+ /**
+ * Instantiates a new Sofa registry.
+ *
+ * @param url the url
+ */
+ public SofaRegistry(URL url) {
+ super(url);
+ if (logger.isInfoEnabled()) {
+ logger.info("Build sofa registry by url:" + url);
+ }
+ this.registryClient = buildClient(url);
+ this.waitAddressTimeout = Integer.parseInt(System.getProperty(ADDRESS_WAIT_TIME_KEY, "5000"));
+ }
+
+ /**
+ * Build client registry client.
+ *
+ * @param url the url
+ * @return the registry client
+ */
+ protected RegistryClient buildClient(URL url) {
+ RegistryClientConfig config = DefaultRegistryClientConfigBuilder.start()
+ .setDataCenter(LOCAL_DATA_CENTER)
+ .setZone(LOCAL_REGION)
+ .setRegistryEndpoint(url.getHost())
+ .setRegistryEndpointPort(url.getPort()).build();
+
+ DefaultRegistryClient registryClient = new DefaultRegistryClient(config);
+ registryClient.init();
+ return registryClient;
+ }
+
+ @Override
+ public boolean isAvailable() {
+ return true;
+ }
+
+ @Override
+ public void doRegister(URL url) {
+ if (!url.getParameter(REGISTER_KEY, true)
+ || CONSUMER_PROTOCOL.equals(url.getProtocol())) {
+ return;
+ }
+
+ String serviceName = buildServiceName(url);
+ String serviceData = url.toFullString();
+
+ PublisherRegistration registration = new PublisherRegistration(serviceName);
+ addAttributesForPub(registration);
+
+ registryClient.register(registration, serviceData);
+ }
+
+ /**
+ * Add attributes for pub.
+ *
+ * @param publisherRegistration the publisher registration
+ */
+ protected void addAttributesForPub(PublisherRegistration publisherRegistration) {
+ publisherRegistration.setGroup(DEFAULT_GROUP);
+ }
+
+ @Override
+ public void doUnregister(URL url) {
+ if (!url.getParameter(REGISTER_KEY, true)
+ || CONSUMER_PROTOCOL.equals(url.getProtocol())) {
+ return;
+ }
+ String serviceName = buildServiceName(url);
+ registryClient.unregister(serviceName, DEFAULT_GROUP, RegistryType.PUBLISHER);
+ }
+
+ @Override
+ public void doSubscribe(URL url, final NotifyListener listener) {
+ if (!url.getParameter(SUBSCRIBE_KEY, true)
+ || PROVIDER_PROTOCOL.equals(url.getProtocol())) {
+ return;
+ }
+
+ String serviceName = buildServiceName(url);
+ // com.alipay.test.TestService:1.0:group@dubbo
+ Subscriber listSubscriber = subscribers.get(serviceName);
+
+ if (listSubscriber != null) {
+ logger.warn("Service name [" + serviceName + "] have bean registered in SOFARegistry.");
+
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+ handleRegistryData(listSubscriber.peekData(), listener, countDownLatch);
+ waitAddress(serviceName, countDownLatch);
+ return;
+ }
+
+ final CountDownLatch latch = new CountDownLatch(1);
+ SubscriberRegistration subscriberRegistration = new SubscriberRegistration(serviceName,
+ (dataId, data) -> {
+ //record change
+ printAddressData(dataId, data);
+ handleRegistryData(data, listener, latch);
+ });
+
+ addAttributesForSub(subscriberRegistration);
+ listSubscriber = registryClient.register(subscriberRegistration);
+
+ subscribers.put(serviceName, listSubscriber);
+
+ waitAddress(serviceName, latch);
+ }
+
+ private void waitAddress(String serviceName, CountDownLatch countDownLatch) {
+ try {
+ if (!countDownLatch.await(waitAddressTimeout, TimeUnit.MILLISECONDS)) {
+ logger.warn("Subscribe data failed by dataId " + serviceName);
+ }
+ } catch (Exception e) {
+ logger.error("Error when wait Address!", e);
+ }
+ }
+
+ @Override
+ public void doUnsubscribe(URL url, NotifyListener listener) {
+ if (!url.getParameter(SUBSCRIBE_KEY, true)
+ || PROVIDER_PROTOCOL.equals(url.getProtocol())) {
+ return;
+ }
+ String serviceName = buildServiceName(url);
+
+ registryClient.unregister(serviceName, DEFAULT_GROUP, RegistryType.SUBSCRIBER);
+ }
+
+ private void handleRegistryData(UserData data, NotifyListener notifyListener,
+ CountDownLatch latch) {
+ try {
+ List<URL> urls = new ArrayList<>();
+ if (null != data) {
+
+ List<String> datas = flatUserData(data);
+ for (String serviceUrl : datas) {
+ URL url = URL.valueOf(serviceUrl);
+ String serverApplication = url.getParameter(APPLICATION_KEY);
+ if (StringUtils.isNotEmpty(serverApplication)) {
+ url = url.addParameter("dstApp", serverApplication);
+ }
+ urls.add(url);
+ }
+ }
+ notifyListener.notify(urls);
+ } finally {
+ latch.countDown();
+ }
+ }
+
+ private String buildServiceName(URL url) {
+ // return url.getServiceKey();
+ StringBuilder buf = new StringBuilder();
+ buf.append(url.getServiceInterface());
+ String version = url.getParameter(VERSION_KEY);
+ if (StringUtils.isNotEmpty(version)) {
+ buf.append(":").append(version);
+ }
+ String group = url.getParameter(GROUP_KEY);
+ if (StringUtils.isNotEmpty(group)) {
+ buf.append(":").append(group);
+ }
+ buf.append("@").append(DUBBO);
+ return buf.toString();
+ }
+
+ /**
+ * Print address data.
+ *
+ * @param dataId the data id
+ * @param userData the user data
+ */
+ protected void printAddressData(String dataId, UserData userData) {
+
+ List<String> datas;
+ if (userData == null) {
+ datas = new ArrayList<>(0);
+ } else {
+ datas = flatUserData(userData);
+ }
+
+ StringBuilder sb = new StringBuilder();
+ for (String provider : datas) {
+ sb.append(" >>> ").append(provider).append("\n");
+ }
+ if (logger.isInfoEnabled()) {
+ logger.info("Receive updated RPC service addresses: service[" + dataId
+ + "]\n .Available target addresses size [" + datas.size() + "]\n"
+ + sb.toString());
+ }
+ }
+
+ /**
+ * Add attributes for sub.
+ *
+ * @param subscriberRegistration the subscriber registration
+ */
+ protected void addAttributesForSub(SubscriberRegistration subscriberRegistration) {
+ subscriberRegistration.setGroup(DEFAULT_GROUP);
+ subscriberRegistration.setScopeEnum(ScopeEnum.global);
+ }
+
+ /**
+ * Flat user data list.
+ *
+ * @param userData the user data
+ * @return the list
+ */
+ protected List<String> flatUserData(UserData userData) {
+ List<String> result = new ArrayList<>();
+ Map<String, List<String>> zoneData = userData.getZoneData();
+
+ for (Map.Entry<String, List<String>> entry : zoneData.entrySet()) {
+ result.addAll(entry.getValue());
+ }
+
+ return result;
+ }
+}
diff --git a/dubbo-registry-extensions/dubbo-registry-sofa/src/main/java/org/apache/dubbo/registry/sofa/SofaRegistryConstants.java b/dubbo-registry-extensions/dubbo-registry-sofa/src/main/java/org/apache/dubbo/registry/sofa/SofaRegistryConstants.java
new file mode 100644
index 0000000..f832e80
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-sofa/src/main/java/org/apache/dubbo/registry/sofa/SofaRegistryConstants.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.sofa;
+
+/**
+ * @since 2.7.2
+ */
+public class SofaRegistryConstants {
+
+ /**
+ * Default data center
+ */
+ public static final String LOCAL_DATA_CENTER = "DefaultDataCenter";
+
+ /**
+ * Default region
+ */
+ public static final String LOCAL_REGION = "DEFAULT_ZONE";
+
+ /**
+ * Default group
+ */
+ public static final String DEFAULT_GROUP = "SOFA";
+
+ /**
+ * parameter for address.wait.time of rpc reference
+ */
+ public static final String ADDRESS_WAIT_TIME_KEY = "rpc.reference.address.wait.time";
+}
diff --git a/dubbo-registry-extensions/dubbo-registry-sofa/src/main/java/org/apache/dubbo/registry/sofa/SofaRegistryFactory.java b/dubbo-registry-extensions/dubbo-registry-sofa/src/main/java/org/apache/dubbo/registry/sofa/SofaRegistryFactory.java
new file mode 100644
index 0000000..18a7809
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-sofa/src/main/java/org/apache/dubbo/registry/sofa/SofaRegistryFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.sofa;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.registry.Registry;
+import org.apache.dubbo.registry.support.AbstractRegistryFactory;
+
+/**
+ * @since 2.7.2
+ */
+public class SofaRegistryFactory extends AbstractRegistryFactory {
+
+ @Override
+ protected Registry createRegistry(URL url) {
+ initEnvironment(url);
+ return new SofaRegistry(url);
+ }
+
+ /**
+ * For extension, such as load zone/accessKey/secretKey/...
+ *
+ * @param url URL
+ */
+ protected void initEnvironment(URL url) {
+ }
+}
diff --git a/dubbo-registry-extensions/dubbo-registry-sofa/src/main/java/org/apache/dubbo/registry/sofa/SofaRegistryInstance.java b/dubbo-registry-extensions/dubbo-registry-sofa/src/main/java/org/apache/dubbo/registry/sofa/SofaRegistryInstance.java
new file mode 100644
index 0000000..65befdb
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-sofa/src/main/java/org/apache/dubbo/registry/sofa/SofaRegistryInstance.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.sofa;
+
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class SofaRegistryInstance {
+ private String id;
+
+ private String host;
+
+ private int port;
+
+ private String name;
+
+ private Map<String, String> metadata = new HashMap<>();
+
+ private SofaRegistryInstance() {
+ }
+
+ public SofaRegistryInstance(String id, String host, int port, String name, Map<String, String> metadata) {
+ this.id = id;
+ this.host = host;
+ this.port = port;
+ this.name = name;
+ this.metadata = metadata;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public void setHost(String host) {
+ this.host = host;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public void setPort(int port) {
+ this.port = port;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public String getName() {
+ return this.name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public Map<String, String> getMetadata() {
+ return this.metadata;
+ }
+
+ public void setMetadata(Map<String, String> metadata) {
+ this.metadata = metadata;
+ }
+
+ @Override
+ public String toString() {
+ return "SofaRegistryInstance{" + "id='" + this.id + '\''+ "host='" + this.host + '\'' + "port='" + this.port + '\''+ ", name='" + this.name
+ + '\'' + ", metadata=" + this.metadata + '}';
+ }
+}
diff --git a/dubbo-registry-extensions/dubbo-registry-sofa/src/main/java/org/apache/dubbo/registry/sofa/SofaRegistryServiceDiscovery.java b/dubbo-registry-extensions/dubbo-registry-sofa/src/main/java/org/apache/dubbo/registry/sofa/SofaRegistryServiceDiscovery.java
new file mode 100644
index 0000000..6867685
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-sofa/src/main/java/org/apache/dubbo/registry/sofa/SofaRegistryServiceDiscovery.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.sofa;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.registry.client.AbstractServiceDiscovery;
+import org.apache.dubbo.registry.client.DefaultServiceInstance;
+import org.apache.dubbo.registry.client.ServiceInstance;
+import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent;
+import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener;
+import org.apache.dubbo.rpc.model.ApplicationModel;
+
+import com.alipay.sofa.registry.client.api.Publisher;
+import com.alipay.sofa.registry.client.api.RegistryClientConfig;
+import com.alipay.sofa.registry.client.api.Subscriber;
+import com.alipay.sofa.registry.client.api.model.RegistryType;
+import com.alipay.sofa.registry.client.api.model.UserData;
+import com.alipay.sofa.registry.client.api.registration.PublisherRegistration;
+import com.alipay.sofa.registry.client.api.registration.SubscriberRegistration;
+import com.alipay.sofa.registry.client.provider.DefaultRegistryClient;
+import com.alipay.sofa.registry.client.provider.DefaultRegistryClientConfigBuilder;
+import com.alipay.sofa.registry.core.model.ScopeEnum;
+import com.google.gson.Gson;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.dubbo.registry.sofa.SofaRegistryConstants.ADDRESS_WAIT_TIME_KEY;
+import static org.apache.dubbo.registry.sofa.SofaRegistryConstants.LOCAL_DATA_CENTER;
+import static org.apache.dubbo.registry.sofa.SofaRegistryConstants.LOCAL_REGION;
+
+
+public class SofaRegistryServiceDiscovery extends AbstractServiceDiscovery {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(SofaRegistryServiceDiscovery.class);
+
+ private static final String DEFAULT_GROUP = "dubbo";
+
+ private DefaultRegistryClient registryClient;
+
+ private int waitAddressTimeout;
+
+ private RegistryClientConfig registryClientConfig;
+
+ private final Map<String, Publisher> publishers = new ConcurrentHashMap<>();
+
+ private final Map<String, SofaRegistryListener> sofaRegistryListenerMap = new ConcurrentHashMap<>();
+
+
+ private Gson gson = new Gson();
+
+ public SofaRegistryServiceDiscovery(ApplicationModel applicationModel, URL registryURL) {
+ super(applicationModel, registryURL);
+
+ this.registryClientConfig = DefaultRegistryClientConfigBuilder.start()
+ .setDataCenter(LOCAL_DATA_CENTER)
+ .setZone(LOCAL_REGION)
+ .setRegistryEndpoint(registryURL.getHost())
+ .setRegistryEndpointPort(registryURL.getPort()).build();
+
+ registryClient = new DefaultRegistryClient(this.registryClientConfig);
+ registryClient.init();
+
+ this.waitAddressTimeout = Integer.parseInt(System.getProperty(ADDRESS_WAIT_TIME_KEY, "5000"));
+ }
+
+ @Override
+ public URL getUrl() {
+ return registryURL;
+ }
+
+ @Override
+ protected void doDestroy() throws Exception {
+
+ }
+
+ @Override
+ public void doRegister(ServiceInstance serviceInstance) {
+ SofaRegistryInstance sofaRegistryInstance = new SofaRegistryInstance(UUID.randomUUID().toString(), serviceInstance.getHost(), serviceInstance.getPort(), serviceInstance.getServiceName(), serviceInstance.getMetadata());
+ Publisher publisher = publishers.get(serviceInstance.getServiceName());
+ this.serviceInstance = serviceInstance;
+ if (null == publisher) {
+ PublisherRegistration registration = new PublisherRegistration(serviceInstance.getServiceName());
+ registration.setGroup(DEFAULT_GROUP);
+ publisher = registryClient.register(registration, gson.toJson(sofaRegistryInstance));
+
+ publishers.put(serviceInstance.getServiceName(), publisher);
+ } else {
+ publisher.republish(gson.toJson(sofaRegistryInstance));
+ }
+ }
+
+ @Override
+ protected void doUnregister(ServiceInstance serviceInstance) {
+ registryClient.unregister(serviceInstance.getServiceName(), DEFAULT_GROUP, RegistryType.PUBLISHER);
+ }
+
+ @Override
+ public synchronized void addServiceInstancesChangedListener(ServiceInstancesChangedListener listener) throws NullPointerException, IllegalArgumentException {
+ listener.getServiceNames().forEach(serviceName -> registerServiceWatcher(serviceName, listener));
+ }
+
+ protected void registerServiceWatcher(String serviceName, ServiceInstancesChangedListener listener) {
+ SofaRegistryListener sofaRegistryListener = sofaRegistryListenerMap.get(serviceName);
+
+ if (null == sofaRegistryListener) {
+ sofaRegistryListener = new SofaRegistryListener(serviceName);
+ sofaRegistryListenerMap.put(serviceName, sofaRegistryListener);
+ sofaRegistryListener.addListener(listener);
+ sofaRegistryListener.start();
+ } else {
+ sofaRegistryListener.addListener(listener);
+ }
+
+ }
+
+ @Override
+ public synchronized List<ServiceInstance> getInstances(String serviceName) throws NullPointerException {
+ SofaRegistryListener sofaRegistryListener = sofaRegistryListenerMap.get(serviceName);
+
+ if (null == sofaRegistryListener) {
+ sofaRegistryListener = new SofaRegistryListener(serviceName);
+ sofaRegistryListenerMap.put(serviceName, sofaRegistryListener);
+ sofaRegistryListener.start();
+ }
+
+ return sofaRegistryListener.peekData();
+ }
+
+ @Override
+ public synchronized void removeServiceInstancesChangedListener(ServiceInstancesChangedListener listener) throws IllegalArgumentException {
+ for (String serviceName : listener.getServiceNames()) {
+ SofaRegistryListener sofaRegistryListener = sofaRegistryListenerMap.get(serviceName);
+
+ if (null != sofaRegistryListener) {
+ sofaRegistryListener.removeListener(listener);
+
+ if (sofaRegistryListener.getListenerCount() == 0) {
+ sofaRegistryListener.stop();
+ sofaRegistryListenerMap.remove(serviceName);
+ }
+ }
+ }
+ }
+
+ public class SofaRegistryListener {
+ private final String serviceName;
+ private volatile Subscriber subscriber;
+ private final List<ServiceInstancesChangedListener> listeners = new CopyOnWriteArrayList<>();
+
+ private volatile List<ServiceInstance> serviceInstances;
+
+ public SofaRegistryListener(String serviceName) {
+ this.serviceName = serviceName;
+ }
+
+ public void start() {
+ final CountDownLatch latch = new CountDownLatch(1);
+ SubscriberRegistration subscriberRegistration = new SubscriberRegistration(serviceName, (dataId, data) -> {
+ handleRegistryData(dataId, data, latch);
+ });
+ subscriberRegistration.setGroup(DEFAULT_GROUP);
+ subscriberRegistration.setScopeEnum(ScopeEnum.global);
+
+ subscriber = registryClient.register(subscriberRegistration);
+
+ waitAddress(serviceName, latch);
+ }
+
+ public void stop() {
+ if (null != subscriber) {
+ subscriber.unregister();
+ }
+ }
+
+ private void handleRegistryData(String dataId, UserData userData, CountDownLatch latch) {
+ try {
+ List<String> datas = getUserData(dataId, userData);
+ List<ServiceInstance> newServiceInstances = new ArrayList<>(datas.size());
+
+ for (String serviceData : datas) {
+ SofaRegistryInstance sri = gson.fromJson(serviceData, SofaRegistryInstance.class);
+
+ DefaultServiceInstance serviceInstance = new DefaultServiceInstance(dataId, sri.getHost(), sri.getPort(), applicationModel);
+ serviceInstance.setMetadata(sri.getMetadata());
+ newServiceInstances.add(serviceInstance);
+ }
+
+ this.serviceInstances = newServiceInstances;
+
+ for (ServiceInstancesChangedListener listener : listeners) {
+ listener.onEvent(new ServiceInstancesChangedEvent(dataId, serviceInstances));
+ }
+ } finally {
+ if (null != latch) {
+ latch.countDown();
+ }
+ }
+ }
+
+ public List<ServiceInstance> peekData() {
+ return serviceInstances;
+ }
+
+ public void addListener(ServiceInstancesChangedListener listener) {
+ listeners.add(listener);
+ }
+
+ public void removeListener(ServiceInstancesChangedListener listener) {
+ listeners.remove(listener);
+ }
+
+ public int getListenerCount() {
+ return listeners.size();
+ }
+ }
+
+ private void waitAddress(String serviceName, CountDownLatch countDownLatch) {
+ try {
+ if (!countDownLatch.await(waitAddressTimeout, TimeUnit.MILLISECONDS)) {
+ LOGGER.warn("Subscribe data failed by dataId " + serviceName);
+ }
+ } catch (Exception e) {
+ LOGGER.error("Error when wait Address!", e);
+ }
+ }
+
+ /**
+ * Print address data.
+ *
+ * @param dataId the data id
+ * @param userData the user data
+ */
+ protected List<String> getUserData(String dataId, UserData userData) {
+
+ List<String> datas = null;
+ if (userData == null) {
+ datas = new ArrayList<>(0);
+ } else {
+ datas = flatUserData(userData);
+ }
+
+ StringBuilder sb = new StringBuilder();
+ for (String provider : datas) {
+ sb.append(" >>> ").append(provider).append("\n");
+ }
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("Receive updated RPC service addresses: service[" + dataId
+ + "]\n .Available target addresses size [" + datas.size() + "]\n"
+ + sb.toString());
+ }
+
+ return datas;
+ }
+
+ /**
+ * Flat user data list.
+ *
+ * @param userData the user data
+ * @return the list
+ */
+ protected List<String> flatUserData(UserData userData) {
+ List<String> result = new ArrayList<>();
+ Map<String, List<String>> zoneData = userData.getZoneData();
+
+ for (Map.Entry<String, List<String>> entry : zoneData.entrySet()) {
+ result.addAll(entry.getValue());
+ }
+
+ return result;
+ }
+
+ @Override
+ public Set<String> getServices() {
+ return sofaRegistryListenerMap.keySet();
+ }
+
+}
diff --git a/dubbo-registry-extensions/dubbo-registry-sofa/src/main/java/org/apache/dubbo/registry/sofa/SofaRegistryServiceDiscoveryFactory.java b/dubbo-registry-extensions/dubbo-registry-sofa/src/main/java/org/apache/dubbo/registry/sofa/SofaRegistryServiceDiscoveryFactory.java
new file mode 100644
index 0000000..b3b03e5
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-sofa/src/main/java/org/apache/dubbo/registry/sofa/SofaRegistryServiceDiscoveryFactory.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.sofa;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.registry.client.AbstractServiceDiscoveryFactory;
+import org.apache.dubbo.registry.client.ServiceDiscovery;
+
+public class SofaRegistryServiceDiscoveryFactory extends AbstractServiceDiscoveryFactory {
+ @Override
+ protected ServiceDiscovery createDiscovery(URL registryURL) {
+ return new SofaRegistryServiceDiscovery(applicationModel, registryURL);
+ }
+}
diff --git a/dubbo-registry-extensions/dubbo-registry-sofa/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.RegistryFactory b/dubbo-registry-extensions/dubbo-registry-sofa/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.RegistryFactory
new file mode 100644
index 0000000..dff4cbb
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-sofa/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.RegistryFactory
@@ -0,0 +1 @@
+sofa=org.apache.dubbo.registry.sofa.SofaRegistryFactory
\ No newline at end of file
diff --git a/dubbo-registry-extensions/dubbo-registry-sofa/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.ServiceDiscovery b/dubbo-registry-extensions/dubbo-registry-sofa/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.ServiceDiscovery
new file mode 100644
index 0000000..4578cbd
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-sofa/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.ServiceDiscovery
@@ -0,0 +1 @@
+sofa=org.apache.dubbo.registry.sofa.SofaRegistryServiceDiscovery
\ No newline at end of file
diff --git a/dubbo-registry-extensions/dubbo-registry-sofa/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.ServiceDiscoveryFactory b/dubbo-registry-extensions/dubbo-registry-sofa/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.ServiceDiscoveryFactory
new file mode 100644
index 0000000..d16d75c
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-sofa/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.ServiceDiscoveryFactory
@@ -0,0 +1 @@
+sofa=org.apache.dubbo.registry.sofa.SofaRegistryServiceDiscoveryFactory
\ No newline at end of file
diff --git a/dubbo-registry-extensions/dubbo-registry-sofa/src/test/java/org/apache/dubbo/registry/sofa/HelloService.java b/dubbo-registry-extensions/dubbo-registry-sofa/src/test/java/org/apache/dubbo/registry/sofa/HelloService.java
new file mode 100644
index 0000000..cc66a5e
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-sofa/src/test/java/org/apache/dubbo/registry/sofa/HelloService.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.sofa;
+
+/**
+ */
+public interface HelloService {
+
+ String sayHello(String name);
+}
diff --git a/dubbo-registry-extensions/dubbo-registry-sofa/src/test/java/org/apache/dubbo/registry/sofa/HelloServiceImpl.java b/dubbo-registry-extensions/dubbo-registry-sofa/src/test/java/org/apache/dubbo/registry/sofa/HelloServiceImpl.java
new file mode 100644
index 0000000..a9fb0ca
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-sofa/src/test/java/org/apache/dubbo/registry/sofa/HelloServiceImpl.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.sofa;
+
+import com.alipay.sofa.registry.server.test.TestRegistryMain;
+
+/**
+ */
+public class HelloServiceImpl implements HelloService {
+
+ private final String result;
+
+ public HelloServiceImpl(String result) {
+ this.result = result;
+ }
+
+ @Override
+ public String sayHello(String name) {
+ return result != null ? result : "hello, " + name + "!";
+ }
+
+ public static void main(String[] args) {
+ TestRegistryMain registryMain = new TestRegistryMain();
+ try {
+ registryMain.startRegistry();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/dubbo-registry-extensions/dubbo-registry-sofa/src/test/resources/log4j.properties b/dubbo-registry-extensions/dubbo-registry-sofa/src/test/resources/log4j.properties
new file mode 100644
index 0000000..8de4c4f
--- /dev/null
+++ b/dubbo-registry-extensions/dubbo-registry-sofa/src/test/resources/log4j.properties
@@ -0,0 +1,7 @@
+###set log levels###
+log4j.rootLogger=info, stdout
+###output to the console###
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.Target=System.out
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=[%d{dd/MM/yy HH:mm:ss:SSS z}] %t %5p %c{2}: %m%n
diff --git a/dubbo-registry-extensions/pom.xml b/dubbo-registry-extensions/pom.xml
index 78ba1d7..6d8b87f 100644
--- a/dubbo-registry-extensions/pom.xml
+++ b/dubbo-registry-extensions/pom.xml
@@ -33,5 +33,9 @@
<module>dubbo-registry-kubernetes</module>
<module>dubbo-registry-dns</module>
<module>dubbo-registry-xds</module>
+ <module>dubbo-registry-consul</module>
+ <module>dubbo-registry-etcd3</module>
+ <module>dubbo-registry-redis</module>
+ <module>dubbo-registry-sofa</module>
</modules>
</project>