You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by al...@apache.org on 2021/01/23 08:12:51 UTC
[dubbo] branch 3.0 updated: Add basic framework for xDS Registry
(#7112)
This is an automated email from the ASF dual-hosted git repository.
albumenj pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.0 by this push:
new b0043c3 Add basic framework for xDS Registry (#7112)
b0043c3 is described below
commit b0043c3f5d5aa8eb4b41ea8d4f6db36255a52ef1
Author: Albumen Kevin <jh...@gmail.com>
AuthorDate: Sat Jan 23 16:12:21 2021 +0800
Add basic framework for xDS Registry (#7112)
* introduce basic xds registry
* complete protocol develop
* add xds dependency to project level
* Finish self-host metadata fetch
* add comment
* cleanup pom
---
dubbo-dependencies-bom/pom.xml | 6 +
dubbo-distribution/dubbo-all/pom.xml | 8 +
dubbo-distribution/dubbo-bom/pom.xml | 5 +
.../client/SelfHostMetaServiceDiscovery.java | 3 +-
dubbo-registry/dubbo-registry-xds/pom.xml | 68 ++++++
.../org/apache/dubbo/registry/xds/XdsRegistry.java | 58 +++++
.../dubbo/registry/xds/XdsRegistryFactory.java | 34 +++
.../dubbo/registry/xds/XdsServiceDiscovery.java | 78 ++++++
.../registry/xds/XdsServiceDiscoveryFactory.java | 28 +++
.../dubbo/registry/xds/util/NodeBuilder.java | 29 +++
.../dubbo/registry/xds/util/PilotExchanger.java | 146 ++++++++++++
.../apache/dubbo/registry/xds/util/XdsChannel.java | 58 +++++
.../xds/util/protocol/AbstractProtocol.java | 262 +++++++++++++++++++++
.../registry/xds/util/protocol/DeltaResource.java | 32 +++
.../registry/xds/util/protocol/XdsProtocol.java | 48 ++++
.../xds/util/protocol/delta/DeltaEndpoint.java | 51 ++++
.../xds/util/protocol/delta/DeltaListener.java | 49 ++++
.../xds/util/protocol/delta/DeltaRoute.java | 47 ++++
.../xds/util/protocol/impl/EdsProtocol.java | 114 +++++++++
.../xds/util/protocol/impl/LdsProtocol.java | 123 ++++++++++
.../xds/util/protocol/impl/RdsProtocol.java | 115 +++++++++
.../xds/util/protocol/message/Endpoint.java | 88 +++++++
.../xds/util/protocol/message/EndpointResult.java | 62 +++++
.../xds/util/protocol/message/ListenerResult.java | 70 ++++++
.../xds/util/protocol/message/RouteResult.java | 73 ++++++
.../org.apache.dubbo.registry.RegistryFactory | 1 +
...g.apache.dubbo.registry.client.ServiceDiscovery | 1 +
...e.dubbo.registry.client.ServiceDiscoveryFactory | 1 +
dubbo-registry/pom.xml | 1 +
29 files changed, 1658 insertions(+), 1 deletion(-)
diff --git a/dubbo-dependencies-bom/pom.xml b/dubbo-dependencies-bom/pom.xml
index 8d84b38..f409ae8 100644
--- a/dubbo-dependencies-bom/pom.xml
+++ b/dubbo-dependencies-bom/pom.xml
@@ -125,6 +125,7 @@
<snakeyaml_version>1.20</snakeyaml_version>
<commons_lang3_version>3.8.1</commons_lang3_version>
<protostuff_version>1.5.9</protostuff_version>
+ <envoy_api_version>0.1.23</envoy_api_version>
<rs_api_version>2.0</rs_api_version>
<resteasy_version>3.0.19.Final</resteasy_version>
@@ -533,6 +534,11 @@
<artifactId>commons-lang3</artifactId>
<version>${commons_lang3_version}</version>
</dependency>
+ <dependency>
+ <groupId>io.envoyproxy.controlplane</groupId>
+ <artifactId>api</artifactId>
+ <version>${envoy_api_version}</version>
+ </dependency>
<!-- for dubbo-rpc-webservice -->
<dependency>
diff --git a/dubbo-distribution/dubbo-all/pom.xml b/dubbo-distribution/dubbo-all/pom.xml
index e8e4406..6b009b4 100644
--- a/dubbo-distribution/dubbo-all/pom.xml
+++ b/dubbo-distribution/dubbo-all/pom.xml
@@ -173,6 +173,13 @@
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo-registry-xds</artifactId>
+ <version>${project.version}</version>
+ <scope>compile</scope>
+ <optional>true</optional>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-monitor-api</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
@@ -374,6 +381,7 @@
<include>org.apache.dubbo:dubbo-registry-multiple</include>
<include>org.apache.dubbo:dubbo-registry-kubernetes</include>
<include>org.apache.dubbo:dubbo-registry-dns</include>
+ <include>org.apache.dubbo:dubbo-registry-xds</include>
<include>org.apache.dubbo:dubbo-monitor-api</include>
<include>org.apache.dubbo:dubbo-monitor-default</include>
<include>org.apache.dubbo:dubbo-container-api</include>
diff --git a/dubbo-distribution/dubbo-bom/pom.xml b/dubbo-distribution/dubbo-bom/pom.xml
index d3c1b33..5826d10 100644
--- a/dubbo-distribution/dubbo-bom/pom.xml
+++ b/dubbo-distribution/dubbo-bom/pom.xml
@@ -135,6 +135,11 @@
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo-registry-xds</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-monitor-api</artifactId>
<version>${project.version}</version>
</dependency>
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/SelfHostMetaServiceDiscovery.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/SelfHostMetaServiceDiscovery.java
index 25ab77e..1034420 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/SelfHostMetaServiceDiscovery.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/SelfHostMetaServiceDiscovery.java
@@ -240,7 +240,8 @@ public abstract class SelfHostMetaServiceDiscovery implements ServiceDiscovery {
serviceInstanceRevisionMap.put(serviceName, serviceInstanceRevision));
if (logger.isDebugEnabled()) {
- logger.debug("Poll DNS data. Service Instance changed: " + changed + " Service Name: " + serviceName);
+ logger.debug("Service changed event received (possibly because of DNS polling). " +
+ "Service Instance changed: " + changed + " Service Name: " + serviceName);
}
if (changed) {
diff --git a/dubbo-registry/dubbo-registry-xds/pom.xml b/dubbo-registry/dubbo-registry-xds/pom.xml
new file mode 100644
index 0000000..effda5a
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-xds/pom.xml
@@ -0,0 +1,68 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo-registry</artifactId>
+ <version>${revision}</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>dubbo-registry-xds</artifactId>
+ <name>${project.artifactId}</name>
+ <description>The Xds registry module of Dubbo project</description>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo-registry-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-protobuf</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-stub</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>io.envoyproxy.controlplane</groupId>
+ <artifactId>api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java-util</artifactId>
+ </dependency>
+
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/XdsRegistry.java b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/XdsRegistry.java
new file mode 100644
index 0000000..a4bd51e
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/XdsRegistry.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.xds;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.registry.NotifyListener;
+import org.apache.dubbo.registry.support.FailbackRegistry;
+
+/**
+ * Empty implements for xDS <br/>
+ * xDS only support `Service Discovery` mode register <br/>
+ * Used to compat past version like 2.6.x, 2.7.x with interface level register <br/>
+ * {@link XdsServiceDiscovery} is the real implementation of xDS
+ */
+public class XdsRegistry extends FailbackRegistry {
+ public XdsRegistry(URL url) {
+ super(url);
+ }
+
+ @Override
+ public boolean isAvailable() {
+ return true;
+ }
+
+ @Override
+ public void doRegister(URL url) {
+
+ }
+
+ @Override
+ public void doUnregister(URL url) {
+
+ }
+
+ @Override
+ public void doSubscribe(URL url, NotifyListener listener) {
+
+ }
+
+ @Override
+ public void doUnsubscribe(URL url, NotifyListener listener) {
+
+ }
+}
diff --git a/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/XdsRegistryFactory.java b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/XdsRegistryFactory.java
new file mode 100644
index 0000000..8fa130b
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/XdsRegistryFactory.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.xds;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.registry.Registry;
+import org.apache.dubbo.registry.support.AbstractRegistryFactory;
+
+public class XdsRegistryFactory extends AbstractRegistryFactory {
+
+ @Override
+ protected String createRegistryCacheKey(URL url) {
+ return url.toFullString();
+ }
+
+ @Override
+ protected Registry createRegistry(URL url) {
+ return new XdsRegistry(url);
+ }
+}
diff --git a/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/XdsServiceDiscovery.java b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/XdsServiceDiscovery.java
new file mode 100644
index 0000000..0651e3e
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/XdsServiceDiscovery.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.xds;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.registry.client.DefaultServiceInstance;
+import org.apache.dubbo.registry.client.SelfHostMetaServiceDiscovery;
+import org.apache.dubbo.registry.client.ServiceInstance;
+import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener;
+import org.apache.dubbo.registry.xds.util.PilotExchanger;
+import org.apache.dubbo.registry.xds.util.protocol.message.Endpoint;
+
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
+public class XdsServiceDiscovery extends SelfHostMetaServiceDiscovery {
+ private PilotExchanger exchanger;
+ private URL registryURL;
+
+ @Override
+ public void doInitialize(URL registryURL) throws Exception {
+ exchanger = PilotExchanger.initialize(registryURL);
+ }
+
+ @Override
+ public void doDestroy() throws Exception {
+ exchanger.destroy();
+ }
+
+ @Override
+ public Set<String> getServices() {
+ return exchanger.getServices();
+ }
+
+ @Override
+ public List<ServiceInstance> getInstances(String serviceName) throws NullPointerException {
+ Set<Endpoint> endpoints = exchanger.getEndpoints(serviceName);
+ return changedToInstances(serviceName, endpoints);
+ }
+
+ @Override
+ public void addServiceInstancesChangedListener(ServiceInstancesChangedListener listener) throws NullPointerException, IllegalArgumentException {
+ listener.getServiceNames().forEach(serviceName -> {
+ exchanger.observeEndpoints(serviceName, (endpoints -> {
+ notifyListener(serviceName, listener, changedToInstances(serviceName, endpoints));
+ }));
+ });
+ }
+
+ private List<ServiceInstance> changedToInstances(String serviceName, Collection<Endpoint> endpoints) {
+ List<ServiceInstance> instances = new LinkedList<>();
+ endpoints.forEach(endpoint -> {
+ DefaultServiceInstance serviceInstance = new DefaultServiceInstance(serviceName, endpoint.getAddress(), endpoint.getPortValue());
+ // fill metadata by SelfHostMetaServiceDiscovery, will be fetched by RPC request
+ fillServiceInstance(serviceInstance);
+ instances.add(serviceInstance);
+ });
+ instances.sort(Comparator.comparingInt(ServiceInstance::hashCode));
+ return instances;
+ }
+}
diff --git a/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/XdsServiceDiscoveryFactory.java b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/XdsServiceDiscoveryFactory.java
new file mode 100644
index 0000000..ef77ccb
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/XdsServiceDiscoveryFactory.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.xds;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.registry.client.AbstractServiceDiscoveryFactory;
+import org.apache.dubbo.registry.client.ServiceDiscovery;
+
+public class XdsServiceDiscoveryFactory extends AbstractServiceDiscoveryFactory {
+ @Override
+ protected ServiceDiscovery createDiscovery(URL registryURL) {
+ return new XdsServiceDiscovery();
+ }
+}
diff --git a/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/NodeBuilder.java b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/NodeBuilder.java
new file mode 100644
index 0000000..fad0dbf
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/NodeBuilder.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.xds.util;
+
+import io.envoyproxy.envoy.config.core.v3.Node;
+
+public class NodeBuilder {
+ public static Node build() {
+ // TODO: fetch data from environment
+ return Node.newBuilder()
+ .setId("sidecar~127.0.0.1~ratings-v1-7dc98c7588-lwvqd.default~default.svc.cluster.local")
+ .setCluster("ratings.default")
+ .build();
+ }
+}
diff --git a/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/PilotExchanger.java b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/PilotExchanger.java
new file mode 100644
index 0000000..3d49bc1
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/PilotExchanger.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.xds.util;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.utils.CollectionUtils;
+import org.apache.dubbo.common.utils.ConcurrentHashSet;
+import org.apache.dubbo.registry.xds.util.protocol.impl.EdsProtocol;
+import org.apache.dubbo.registry.xds.util.protocol.impl.LdsProtocol;
+import org.apache.dubbo.registry.xds.util.protocol.impl.RdsProtocol;
+import org.apache.dubbo.registry.xds.util.protocol.message.Endpoint;
+import org.apache.dubbo.registry.xds.util.protocol.message.EndpointResult;
+import org.apache.dubbo.registry.xds.util.protocol.message.ListenerResult;
+import org.apache.dubbo.registry.xds.util.protocol.message.RouteResult;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
+
+public class PilotExchanger {
+
+ private final XdsChannel xdsChannel;
+
+ private final RdsProtocol rdsProtocol;
+
+ private final EdsProtocol edsProtocol;
+
+ private ListenerResult listenerResult;
+
+ private RouteResult routeResult;
+
+ private final long observeRouteRequest;
+
+ private final Map<String, Long> domainObserveRequest = new ConcurrentHashMap<>();
+
+ private final Map<String, Set<Consumer<Set<Endpoint>>>> domainObserveConsumer = new ConcurrentHashMap<>();
+
+ private PilotExchanger(URL url) {
+ xdsChannel = new XdsChannel(url);
+ LdsProtocol ldsProtocol = new LdsProtocol(xdsChannel, NodeBuilder.build());
+ this.rdsProtocol = new RdsProtocol(xdsChannel, NodeBuilder.build());
+ this.edsProtocol = new EdsProtocol(xdsChannel, NodeBuilder.build());
+
+ this.listenerResult = ldsProtocol.getListeners();
+ this.routeResult = rdsProtocol.getResource(listenerResult.getRouteConfigNames());
+
+ // Observer RDS update
+ this.observeRouteRequest = rdsProtocol.observeResource(listenerResult.getRouteConfigNames(), (newResult) -> {
+ // check if observed domain update ( will update endpoint observation )
+ domainObserveConsumer.forEach((domain, consumer) -> {
+ Set<String> newRoute = newResult.searchDomain(domain);
+ if (!routeResult.searchDomain(domain).equals(newRoute)) {
+ // routers in observed domain has been updated
+ Long domainRequest = domainObserveRequest.get(domain);
+ if (domainRequest == null) {
+ // router list is empty when observeEndpoints() called and domainRequest has not been created yet
+ // create new observation
+ doObserveEndpoints(domain);
+ } else {
+ // update observation by domainRequest
+ edsProtocol.updateObserve(domainRequest, newRoute);
+ }
+ }
+ });
+ // update local cache
+ routeResult = newResult;
+ });
+
+ // Observe LDS updated
+ ldsProtocol.observeListeners((newListener) -> {
+ // update local cache
+ this.listenerResult = newListener;
+ // update RDS observation
+ rdsProtocol.updateObserve(observeRouteRequest, newListener.getRouteConfigNames());
+ });
+ }
+
+ public static PilotExchanger initialize(URL url) {
+ return new PilotExchanger(url);
+ }
+
+ public void destroy() {
+ xdsChannel.destroy();
+ }
+
+ public Set<String> getServices() {
+ return routeResult.getDomains();
+ }
+
+ public Set<Endpoint> getEndpoints(String domain) {
+ Set<String> cluster = routeResult.searchDomain(domain);
+ if (CollectionUtils.isNotEmpty(cluster)) {
+ EndpointResult endpoint = edsProtocol.getResource(cluster);
+ return endpoint.getEndpoints();
+ } else {
+ return Collections.emptySet();
+ }
+ }
+
+ public void observeEndpoints(String domain, Consumer<Set<Endpoint>> consumer) {
+ // store Consumer
+ domainObserveConsumer.compute(domain, (k, v) -> {
+ if (v == null) {
+ v = new ConcurrentHashSet<>();
+ }
+ // support multi-consumer
+ v.add(consumer);
+ return v;
+ });
+ if (!domainObserveRequest.containsKey(domain)) {
+ doObserveEndpoints(domain);
+ }
+ }
+
+ private void doObserveEndpoints(String domain) {
+ Set<String> router = routeResult.searchDomain(domain);
+ // if router is empty, do nothing
+ // observation will be created when RDS updates
+ if (CollectionUtils.isNotEmpty(router)) {
+ long endpointRequest =
+ edsProtocol.observeResource(
+ router,
+ endpointResult ->
+ // notify consumers
+ domainObserveConsumer.get(domain).forEach(
+ consumer1 -> consumer1.accept(endpointResult.getEndpoints())));
+ domainObserveRequest.put(domain, endpointRequest);
+ }
+ }
+}
diff --git a/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/XdsChannel.java b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/XdsChannel.java
new file mode 100644
index 0000000..5336378
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/XdsChannel.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.xds.util;
+
+import org.apache.dubbo.common.URL;
+
+import io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc;
+import io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryRequest;
+import io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryResponse;
+import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest;
+import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.stub.StreamObserver;
+
+public class XdsChannel {
+ private final ManagedChannel channel;
+
+ protected XdsChannel(URL url) {
+ channel = ManagedChannelBuilder.forAddress(url.getHost(), url.getPort())
+ .usePlaintext()
+ .build();
+ }
+
+ public StreamObserver<DeltaDiscoveryRequest> observeDeltaDiscoveryRequest(StreamObserver<DeltaDiscoveryResponse> observer) {
+ return AggregatedDiscoveryServiceGrpc.newStub(channel).deltaAggregatedResources(observer);
+ }
+
+ public StreamObserver<DiscoveryRequest> createDeltaDiscoveryRequest(StreamObserver<DiscoveryResponse> observer) {
+ return AggregatedDiscoveryServiceGrpc.newStub(channel).streamAggregatedResources(observer);
+ }
+
+ public StreamObserver<io.envoyproxy.envoy.api.v2.DeltaDiscoveryRequest> observeDeltaDiscoveryRequestV2(StreamObserver<io.envoyproxy.envoy.api.v2.DeltaDiscoveryResponse> observer) {
+ return io.envoyproxy.envoy.service.discovery.v2.AggregatedDiscoveryServiceGrpc.newStub(channel).deltaAggregatedResources(observer);
+ }
+
+ public StreamObserver<io.envoyproxy.envoy.api.v2.DiscoveryRequest> createDeltaDiscoveryRequestV2(StreamObserver<io.envoyproxy.envoy.api.v2.DiscoveryResponse> observer) {
+ return io.envoyproxy.envoy.service.discovery.v2.AggregatedDiscoveryServiceGrpc.newStub(channel).streamAggregatedResources(observer);
+ }
+
+ public void destroy() {
+ channel.shutdown();
+ }
+}
diff --git a/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/AbstractProtocol.java b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/AbstractProtocol.java
new file mode 100644
index 0000000..13ac5be
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/AbstractProtocol.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.xds.util.protocol;
+
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.registry.xds.util.XdsChannel;
+
+import io.envoyproxy.envoy.config.core.v3.Node;
+import io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryRequest;
+import io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryResponse;
+import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest;
+import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse;
+import io.grpc.stub.StreamObserver;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+
+public abstract class AbstractProtocol<T, S extends DeltaResource<T>> implements XdsProtocol<T>{
+
+ private static final Logger logger = LoggerFactory.getLogger(AbstractProtocol.class);
+
+ protected final XdsChannel xdsChannel;
+
+ protected final Node node;
+
+ /**
+ * Store Request Parameter ( resourceNames )
+ * K - requestId, V - resourceNames
+ */
+ protected final Map<Long, Set<String>> requestParam = new ConcurrentHashMap<>();
+
+ /**
+ * Store ADS Request Observer ( StreamObserver in Streaming Request )
+ * K - requestId, V - StreamObserver
+ */
+ private final Map<Long, StreamObserver<DiscoveryRequest>> requestObserverMap = new ConcurrentHashMap<>();
+
+ /**
+ * Store Delta-ADS Request Observer ( StreamObserver in Streaming Request )
+ * K - requestId, V - StreamObserver
+ */
+ private final Map<Long, StreamObserver<DeltaDiscoveryRequest>> deltaRequestObserverMap = new ConcurrentHashMap<>();
+
+ /**
+ * Store CompletableFuture for Request ( used to fetch async result in ResponseObserver )
+ * K - requestId, V - CompletableFuture
+ */
+ private final Map<Long, CompletableFuture<T>> streamResult = new ConcurrentHashMap<>();
+
+ /**
+ * Store consumers for Observers ( will consume message produced by Delta-ADS )
+ * K - requestId, V - Consumer
+ */
+ private final Map<Long, Consumer<T>> consumers = new ConcurrentHashMap<>();
+
+ protected final AtomicLong requestId = new AtomicLong(0);
+
+ public AbstractProtocol(XdsChannel xdsChannel, Node node) {
+ this.xdsChannel = xdsChannel;
+ this.node = node;
+ }
+
+ /**
+ * Abstract method to obtain Type-URL from sub-class
+ *
+ * @return Type-URL of xDS
+ */
+ public abstract String getTypeUrl();
+
+ @Override
+ public T getResource(Set<String> resourceNames) {
+ long request = requestId.getAndIncrement();
+
+ // Store Request Parameter, which will be used for ACK
+ requestParam.put(request, resourceNames);
+
+ // create observer
+ StreamObserver<DiscoveryRequest> requestObserver = xdsChannel.createDeltaDiscoveryRequest(new ResponseObserver(request));
+
+ // use future to get async result
+ CompletableFuture<T> future = new CompletableFuture<>();
+ requestObserverMap.put(request, requestObserver);
+ streamResult.put(request, future);
+
+ // send request to control panel
+ requestObserver.onNext(buildDiscoveryRequest(resourceNames));
+
+ try {
+ // get result
+ return future.get();
+ } catch (InterruptedException | ExecutionException e) {
+ logger.error("Error occur when request control panel.");
+ return null;
+ } finally {
+ // close observer
+ requestObserver.onCompleted();
+
+ // remove temp
+ streamResult.remove(request);
+ requestObserverMap.remove(request);
+ requestParam.remove(request);
+ }
+ }
+
+ @Override
+ public long observeResource(Set<String> resourceNames, Consumer<T> consumer) {
+ long request = requestId.getAndIncrement();
+
+ // Store Request Parameter, which will be used for ACK
+ requestParam.put(request, resourceNames);
+
+ // call once for full data
+ consumer.accept(getResource(resourceNames));
+
+ consumers.put(request, consumer);
+ deltaRequestObserverMap.compute(request, (k, v) -> {
+ // create Delta-ADS observer
+ v= xdsChannel.observeDeltaDiscoveryRequest(new DeltaResponseObserver(request));
+
+ // send observe request
+ v.onNext(buildDeltaDiscoveryRequest(resourceNames));
+ return v;
+ });
+ return request;
+ }
+
+ @Override
+ public void updateObserve(long request, Set<String> resourceNames) {
+ // send difference in resourceNames
+ deltaRequestObserverMap.get(request).onNext(buildDeltaDiscoveryRequest(request, resourceNames));
+ }
+
+ protected DiscoveryRequest buildDiscoveryRequest(Set<String> resourceNames) {
+ return DiscoveryRequest.newBuilder()
+ .setNode(node)
+ .setTypeUrl(getTypeUrl())
+ .addAllResourceNames(resourceNames)
+ .build();
+ }
+
+ protected DiscoveryRequest buildDiscoveryRequest(Set<String> resourceNames, DiscoveryResponse response) {
+ // for ACK
+ return DiscoveryRequest.newBuilder()
+ .setNode(node)
+ .setTypeUrl(getTypeUrl())
+ .addAllResourceNames(resourceNames)
+ .setVersionInfo(response.getVersionInfo())
+ .setResponseNonce(response.getNonce())
+ .build();
+ }
+
+ protected DeltaDiscoveryRequest buildDeltaDiscoveryRequest(Set<String> resourceNames) {
+ return DeltaDiscoveryRequest.newBuilder()
+ .setNode(node)
+ .setTypeUrl(getTypeUrl())
+ .addAllResourceNamesSubscribe(resourceNames)
+ .build();
+ }
+
+ protected DeltaDiscoveryRequest buildDeltaDiscoveryRequest(long request, Set<String> resourceNames) {
+ // compare with previous
+ Set<String> previous = requestParam.get(request);
+ Set<String> unsubscribe = new HashSet<String>(previous) {{
+ removeAll(resourceNames);
+ }};
+ requestParam.put(request, resourceNames);
+ return DeltaDiscoveryRequest.newBuilder()
+ .setNode(node)
+ .setTypeUrl(getTypeUrl())
+ .addAllResourceNamesUnsubscribe(unsubscribe)
+ .addAllResourceNamesSubscribe(resourceNames)
+ .build();
+ }
+
+ private DeltaDiscoveryRequest buildDeltaDiscoveryRequest(Set<String> resourceNames, DeltaDiscoveryResponse response) {
+ // for ACK
+ return DeltaDiscoveryRequest.newBuilder()
+ .setNode(node)
+ .setTypeUrl(getTypeUrl())
+ .addAllResourceNamesSubscribe(resourceNames)
+ .setResponseNonce(response.getNonce())
+ .build();
+ }
+
+ protected abstract T decodeDiscoveryResponse(DiscoveryResponse response);
+
+ protected abstract S decodeDeltaDiscoveryResponse(DeltaDiscoveryResponse response, S previous);
+
+ private class ResponseObserver implements StreamObserver<DiscoveryResponse> {
+ private final long requestId;
+
+ public ResponseObserver(long requestId) {
+ this.requestId = requestId;
+ }
+
+ @Override
+ public void onNext(DiscoveryResponse value) {
+ T result = decodeDiscoveryResponse(value);
+ requestObserverMap.get(requestId).onNext(buildDiscoveryRequest(requestParam.get(requestId), value));
+ streamResult.get(requestId).complete(result);
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ logger.error("xDS Client received error message! detail:", t);
+ }
+
+ @Override
+ public void onCompleted() {
+ // ignore
+ }
+ }
+
+ private class DeltaResponseObserver implements StreamObserver<DeltaDiscoveryResponse> {
+ private S delta = null;
+ private final long requestId;
+
+ public DeltaResponseObserver(long requestId) {
+ this.requestId = requestId;
+ }
+
+ @Override
+ public void onNext(DeltaDiscoveryResponse value) {
+ delta = decodeDeltaDiscoveryResponse(value, delta);
+ T routes = delta.getResource();
+ consumers.get(requestId).accept(routes);
+ deltaRequestObserverMap.get(requestId).onNext(buildDeltaDiscoveryRequest(requestParam.get(requestId), value));
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ logger.error("xDS Client received error message! detail:", t);
+ }
+
+ @Override
+ public void onCompleted() {
+ // ignore
+ }
+ }
+
+}
diff --git a/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/DeltaResource.java b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/DeltaResource.java
new file mode 100644
index 0000000..bcae39c
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/DeltaResource.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.xds.util.protocol;
+
+/**
+ * A interface for resources in xDS, which can be updated by ADS delta stream
+ * <br/>
+ * This interface is design to unify the way of fetching data in delta stream
+ * in {@link org.apache.dubbo.registry.xds.util.PilotExchanger}
+ */
+public interface DeltaResource<T> {
+ /**
+ * Get resource from delta stream
+ *
+ * @return the newest resource from stream
+ */
+ T getResource();
+}
diff --git a/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/XdsProtocol.java b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/XdsProtocol.java
new file mode 100644
index 0000000..25ec3fd
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/XdsProtocol.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.xds.util.protocol;
+
+import java.util.Set;
+import java.util.function.Consumer;
+
+public interface XdsProtocol<T> {
+ /**
+ * Gets all {@link T resource} by the specified resource name.
+ * For LDS, the {@param resourceNames} is ignored
+ *
+ * @param resourceNames specified resource name
+ * @return resources, null if request failed
+ */
+ T getResource(Set<String> resourceNames);
+
+ /**
+ * Add a observer resource with {@link Consumer}
+ *
+ * @param resourceNames specified resource name
+ * @param consumer resource notifier, will be called when resource updated
+ * @return requestId, used when resourceNames update with {@link XdsProtocol#updateObserve(long, Set)}
+ */
+ long observeResource(Set<String> resourceNames, Consumer<T> consumer);
+
+ /**
+ * Update observed resource list in {@link XdsProtocol#observeResource(Set, Consumer)}
+ *
+ * @param request requestId returned by {@link XdsProtocol#observeResource(Set, Consumer)}
+ * @param resourceNames new resource name list to observe
+ */
+ void updateObserve(long request, Set<String> resourceNames);
+}
diff --git a/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/delta/DeltaEndpoint.java b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/delta/DeltaEndpoint.java
new file mode 100644
index 0000000..16823aa
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/delta/DeltaEndpoint.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.xds.util.protocol.delta;
+
+import org.apache.dubbo.common.utils.CollectionUtils;
+import org.apache.dubbo.registry.xds.util.protocol.DeltaResource;
+import org.apache.dubbo.registry.xds.util.protocol.message.Endpoint;
+import org.apache.dubbo.registry.xds.util.protocol.message.EndpointResult;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+public class DeltaEndpoint implements DeltaResource<EndpointResult> {
+ private final Map<String, Set<Endpoint>> data = new ConcurrentHashMap<>();
+
+ public void addResource(String resourceName, Set<Endpoint> endpoints) {
+ data.put(resourceName, endpoints);
+ }
+
+ public void removeResource(Collection<String> resourceName) {
+ if (CollectionUtils.isNotEmpty(resourceName)) {
+ resourceName.forEach(data::remove);
+ }
+ }
+
+ @Override
+ public EndpointResult getResource() {
+ Set<Endpoint> set = data.values().stream()
+ .flatMap(Set::stream)
+ .collect(Collectors.toSet());
+ return new EndpointResult(set);
+ }
+
+}
diff --git a/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/delta/DeltaListener.java b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/delta/DeltaListener.java
new file mode 100644
index 0000000..8b66217
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/delta/DeltaListener.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.xds.util.protocol.delta;
+
+import org.apache.dubbo.common.utils.CollectionUtils;
+import org.apache.dubbo.registry.xds.util.protocol.DeltaResource;
+import org.apache.dubbo.registry.xds.util.protocol.message.ListenerResult;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+public class DeltaListener implements DeltaResource<ListenerResult> {
+ private final Map<String, Set<String>> data = new ConcurrentHashMap<>();
+
+ public void addResource(String resourceName, Set<String> listeners) {
+ data.put(resourceName, listeners);
+ }
+
+ public void removeResource(Collection<String> resourceName) {
+ if (CollectionUtils.isNotEmpty(resourceName)) {
+ resourceName.forEach(data::remove);
+ }
+ }
+
+ @Override
+ public ListenerResult getResource() {
+ Set<String> set = data.values().stream()
+ .flatMap(Set::stream)
+ .collect(Collectors.toSet());
+ return new ListenerResult(set);
+ }
+}
diff --git a/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/delta/DeltaRoute.java b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/delta/DeltaRoute.java
new file mode 100644
index 0000000..71fdb47
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/delta/DeltaRoute.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.xds.util.protocol.delta;
+
+import org.apache.dubbo.common.utils.CollectionUtils;
+import org.apache.dubbo.registry.xds.util.protocol.DeltaResource;
+import org.apache.dubbo.registry.xds.util.protocol.message.RouteResult;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class DeltaRoute implements DeltaResource<RouteResult> {
+ private final Map<String, Map<String, Set<String>>> data = new ConcurrentHashMap<>();
+
+ public void addResource(String resourceName, Map<String, Set<String>> route) {
+ data.put(resourceName, route);
+ }
+
+ public void removeResource(Collection<String> resourceName) {
+ if (CollectionUtils.isNotEmpty(resourceName)) {
+ resourceName.forEach(data::remove);
+ }
+ }
+
+ @Override
+ public RouteResult getResource() {
+ Map<String, Set<String>> result = new ConcurrentHashMap<>();
+ data.values().forEach(result::putAll);
+ return new RouteResult(result);
+ }
+}
diff --git a/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/impl/EdsProtocol.java b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/impl/EdsProtocol.java
new file mode 100644
index 0000000..7476c00
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/impl/EdsProtocol.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.xds.util.protocol.impl;
+
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.registry.xds.util.XdsChannel;
+import org.apache.dubbo.registry.xds.util.protocol.AbstractProtocol;
+import org.apache.dubbo.registry.xds.util.protocol.delta.DeltaEndpoint;
+import org.apache.dubbo.registry.xds.util.protocol.message.Endpoint;
+import org.apache.dubbo.registry.xds.util.protocol.message.EndpointResult;
+
+import com.google.protobuf.Any;
+import com.google.protobuf.InvalidProtocolBufferException;
+import io.envoyproxy.envoy.config.core.v3.HealthStatus;
+import io.envoyproxy.envoy.config.core.v3.Node;
+import io.envoyproxy.envoy.config.core.v3.SocketAddress;
+import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment;
+import io.envoyproxy.envoy.config.endpoint.v3.LbEndpoint;
+import io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryResponse;
+import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse;
+import io.envoyproxy.envoy.service.discovery.v3.Resource;
+
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class EdsProtocol extends AbstractProtocol<EndpointResult, DeltaEndpoint> {
+
+ private static final Logger logger = LoggerFactory.getLogger(LdsProtocol.class);
+
+ public EdsProtocol(XdsChannel xdsChannel, Node node) {
+ super(xdsChannel, node);
+ }
+
+ @Override
+ public String getTypeUrl() {
+ return "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment";
+ }
+
+ @Override
+ protected EndpointResult decodeDiscoveryResponse(DiscoveryResponse response) {
+ if (getTypeUrl().equals(response.getTypeUrl())) {
+ Set<Endpoint> set = response.getResourcesList().stream()
+ .map(EdsProtocol::unpackClusterLoadAssignment)
+ .filter(Objects::nonNull)
+ .flatMap((e) -> decodeResourceToEndpoint(e).stream())
+ .collect(Collectors.toSet());
+ return new EndpointResult(set);
+ }
+ return new EndpointResult();
+ }
+
+ @Override
+ protected DeltaEndpoint decodeDeltaDiscoveryResponse(DeltaDiscoveryResponse response, DeltaEndpoint previous) {
+ DeltaEndpoint deltaEndpoint = previous;
+ if (deltaEndpoint == null) {
+ deltaEndpoint = new DeltaEndpoint();
+ }
+ if (getTypeUrl().equals(response.getTypeUrl())) {
+ deltaEndpoint.removeResource(response.getRemovedResourcesList());
+ for (Resource resource : response.getResourcesList()) {
+ ClusterLoadAssignment unpackedResource = unpackClusterLoadAssignment(resource.getResource());
+ if (unpackedResource == null) {
+ continue;
+ }
+ deltaEndpoint.addResource(resource.getName(), decodeResourceToEndpoint(unpackedResource));
+ }
+ }
+ return previous;
+ }
+
+ private static Set<Endpoint> decodeResourceToEndpoint(ClusterLoadAssignment resource) {
+ return resource.getEndpointsList().stream()
+ .flatMap((e) -> e.getLbEndpointsList().stream())
+ .map(EdsProtocol::decodeLbEndpointToEndpoint)
+ .collect(Collectors.toSet());
+ }
+
+ private static Endpoint decodeLbEndpointToEndpoint(LbEndpoint lbEndpoint) {
+ Endpoint endpoint = new Endpoint();
+ SocketAddress address = lbEndpoint.getEndpoint().getAddress().getSocketAddress();
+ endpoint.setAddress(address.getAddress());
+ endpoint.setPortValue(address.getPortValue());
+ boolean healthy = HealthStatus.HEALTHY.equals(lbEndpoint.getHealthStatus()) ||
+ HealthStatus.UNKNOWN.equals(lbEndpoint.getHealthStatus());
+ endpoint.setHealthy(healthy);
+ endpoint.setWeight(lbEndpoint.getLoadBalancingWeight().getValue());
+ return endpoint;
+ }
+
+ private static ClusterLoadAssignment unpackClusterLoadAssignment(Any any) {
+ try {
+ return any.unpack(ClusterLoadAssignment.class);
+ } catch (InvalidProtocolBufferException e) {
+ logger.error("Error occur when decode xDS response.", e);
+ return null;
+ }
+ }
+}
diff --git a/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/impl/LdsProtocol.java b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/impl/LdsProtocol.java
new file mode 100644
index 0000000..3116efa
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/impl/LdsProtocol.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.xds.util.protocol.impl;
+
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.registry.xds.util.XdsChannel;
+import org.apache.dubbo.registry.xds.util.protocol.AbstractProtocol;
+import org.apache.dubbo.registry.xds.util.protocol.delta.DeltaListener;
+import org.apache.dubbo.registry.xds.util.protocol.message.ListenerResult;
+
+import com.google.protobuf.Any;
+import com.google.protobuf.InvalidProtocolBufferException;
+import io.envoyproxy.envoy.config.core.v3.Node;
+import io.envoyproxy.envoy.config.listener.v3.Filter;
+import io.envoyproxy.envoy.config.listener.v3.Listener;
+import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager;
+import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.Rds;
+import io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryResponse;
+import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse;
+import io.envoyproxy.envoy.service.discovery.v3.Resource;
+
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+public class LdsProtocol extends AbstractProtocol<ListenerResult, DeltaListener> {
+
+ private static final Logger logger = LoggerFactory.getLogger(LdsProtocol.class);
+
+ public LdsProtocol(XdsChannel xdsChannel, Node node) {
+ super(xdsChannel, node);
+ }
+
+ @Override
+ public String getTypeUrl() {
+ return "type.googleapis.com/envoy.config.listener.v3.Listener";
+ }
+
+ public ListenerResult getListeners() {
+ return getResource(null);
+ }
+
+ public void observeListeners(Consumer<ListenerResult> consumer) {
+ observeResource(null,consumer);
+ }
+
+ @Override
+ protected ListenerResult decodeDiscoveryResponse(DiscoveryResponse response) {
+ if (getTypeUrl().equals(response.getTypeUrl())) {
+ Set<String> set = response.getResourcesList().stream()
+ .map(LdsProtocol::unpackListener)
+ .filter(Objects::nonNull)
+ .flatMap((e) -> decodeResourceToListener(e).stream())
+ .collect(Collectors.toSet());
+ return new ListenerResult(set);
+ }
+ return new ListenerResult();
+ }
+
+ @Override
+ protected DeltaListener decodeDeltaDiscoveryResponse(DeltaDiscoveryResponse response, DeltaListener previous) {
+ DeltaListener deltaListener = previous;
+ if (deltaListener == null) {
+ deltaListener = new DeltaListener();
+ }
+ if (getTypeUrl().equals(response.getTypeUrl())) {
+ deltaListener.removeResource(response.getRemovedResourcesList());
+ for (Resource resource : response.getResourcesList()) {
+ Listener unpackedResource = unpackListener(resource.getResource());
+ if (unpackedResource == null) {
+ continue;
+ }
+ deltaListener.addResource(resource.getName(), decodeResourceToListener(unpackedResource));
+ }
+ }
+ return deltaListener;
+ }
+
+ private Set<String> decodeResourceToListener(Listener resource) {
+ return resource.getFilterChainsList().stream()
+ .flatMap((e) -> e.getFiltersList().stream())
+ .map(Filter::getTypedConfig)
+ .map(LdsProtocol::unpackHttpConnectionManager)
+ .filter(Objects::nonNull)
+ .map(HttpConnectionManager::getRds)
+ .map(Rds::getRouteConfigName)
+ .collect(Collectors.toSet());
+ }
+
+ private static Listener unpackListener(Any any) {
+ try {
+ return any.unpack(Listener.class);
+ } catch (InvalidProtocolBufferException e) {
+ logger.error("Error occur when decode xDS response.", e);
+ return null;
+ }
+ }
+
+ private static HttpConnectionManager unpackHttpConnectionManager(Any any) {
+ try {
+ return any.unpack(HttpConnectionManager.class);
+ } catch (InvalidProtocolBufferException e) {
+ logger.error("Error occur when decode xDS response.", e);
+ return null;
+ }
+ }
+}
diff --git a/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/impl/RdsProtocol.java b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/impl/RdsProtocol.java
new file mode 100644
index 0000000..a85b2ea
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/impl/RdsProtocol.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.xds.util.protocol.impl;
+
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.registry.xds.util.XdsChannel;
+import org.apache.dubbo.registry.xds.util.protocol.AbstractProtocol;
+import org.apache.dubbo.registry.xds.util.protocol.delta.DeltaRoute;
+import org.apache.dubbo.registry.xds.util.protocol.message.RouteResult;
+
+import com.google.protobuf.Any;
+import com.google.protobuf.InvalidProtocolBufferException;
+import io.envoyproxy.envoy.config.core.v3.Node;
+import io.envoyproxy.envoy.config.route.v3.Route;
+import io.envoyproxy.envoy.config.route.v3.RouteAction;
+import io.envoyproxy.envoy.config.route.v3.RouteConfiguration;
+import io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryResponse;
+import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse;
+import io.envoyproxy.envoy.service.discovery.v3.Resource;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class RdsProtocol extends AbstractProtocol<RouteResult, DeltaRoute> {
+
+ private static final Logger logger = LoggerFactory.getLogger(AbstractProtocol.class);
+
+ public RdsProtocol(XdsChannel xdsChannel, Node node) {
+ super(xdsChannel, node);
+ }
+
+ @Override
+ public String getTypeUrl() {
+ return "type.googleapis.com/envoy.config.route.v3.RouteConfiguration";
+ }
+
+ @Override
+ protected RouteResult decodeDiscoveryResponse(DiscoveryResponse response) {
+ if (getTypeUrl().equals(response.getTypeUrl())) {
+ Map<String, Set<String>> map = response.getResourcesList().stream()
+ .map(RdsProtocol::unpackRouteConfiguration)
+ .filter(Objects::nonNull)
+ .map(RdsProtocol::decodeResourceToListener)
+ .reduce((a, b) -> {
+ a.putAll(b);
+ return a;
+ }).orElse(new HashMap<>());
+ return new RouteResult(map);
+ }
+ return new RouteResult();
+ }
+
+ @Override
+ protected DeltaRoute decodeDeltaDiscoveryResponse(DeltaDiscoveryResponse response, DeltaRoute previous) {
+ DeltaRoute deltaRoute = previous;
+ if (deltaRoute == null) {
+ deltaRoute = new DeltaRoute();
+ }
+ if (getTypeUrl().equals(response.getTypeUrl())) {
+ deltaRoute.removeResource(response.getRemovedResourcesList());
+ for (Resource resource : response.getResourcesList()) {
+ RouteConfiguration unpackedResource = unpackRouteConfiguration(resource.getResource());
+ if (unpackedResource == null) {
+ continue;
+ }
+ deltaRoute.addResource(resource.getName(), decodeResourceToListener(unpackedResource));
+ }
+ }
+ return deltaRoute;
+ }
+
+ private static Map<String, Set<String>> decodeResourceToListener(RouteConfiguration resource) {
+ Map<String, Set<String>> map = new HashMap<>();
+ resource.getVirtualHostsList()
+ .forEach(virtualHost -> {
+ Set<String> cluster = virtualHost.getRoutesList().stream()
+ .map(Route::getRoute)
+ .map(RouteAction::getCluster)
+ .collect(Collectors.toSet());
+ for (String domain : virtualHost.getDomainsList()) {
+ map.put(domain, cluster);
+ }
+ });
+ return map;
+ }
+
+ private static RouteConfiguration unpackRouteConfiguration(Any any) {
+ try {
+ return any.unpack(RouteConfiguration.class);
+ } catch (InvalidProtocolBufferException e) {
+ logger.error("Error occur when decode xDS response.", e);
+ return null;
+ }
+ }
+
+
+}
diff --git a/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/message/Endpoint.java b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/message/Endpoint.java
new file mode 100644
index 0000000..b966bda
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/message/Endpoint.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.xds.util.protocol.message;
+
+import java.util.Objects;
+
+public class Endpoint {
+ private String address;
+ private int portValue;
+ private boolean healthy;
+ private int weight;
+
+ public String getAddress() {
+ return address;
+ }
+
+ public void setAddress(String address) {
+ this.address = address;
+ }
+
+ public int getPortValue() {
+ return portValue;
+ }
+
+ public void setPortValue(int portValue) {
+ this.portValue = portValue;
+ }
+
+ public boolean isHealthy() {
+ return healthy;
+ }
+
+ public void setHealthy(boolean healthy) {
+ this.healthy = healthy;
+ }
+
+ public int getWeight() {
+ return weight;
+ }
+
+ public void setWeight(int weight) {
+ this.weight = weight;
+ }
+
+ @Override
+ public String toString() {
+ return "Endpoint{" +
+ "address='" + address + '\'' +
+ ", portValue='" + portValue + '\'' +
+ ", healthy=" + healthy +
+ ", weight=" + weight +
+ '}';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ Endpoint endpoint = (Endpoint) o;
+ return healthy == endpoint.healthy &&
+ weight == endpoint.weight &&
+ Objects.equals(address, endpoint.address) &&
+ Objects.equals(portValue, endpoint.portValue);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(address, portValue, healthy, weight);
+ }
+}
diff --git a/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/message/EndpointResult.java b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/message/EndpointResult.java
new file mode 100644
index 0000000..fc75eee
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/message/EndpointResult.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.xds.util.protocol.message;
+
+import org.apache.dubbo.common.utils.ConcurrentHashSet;
+
+import java.util.Objects;
+import java.util.Set;
+
+public class EndpointResult {
+ private Set<Endpoint> endpoints;
+
+ public EndpointResult() {
+ this.endpoints = new ConcurrentHashSet<>();
+ }
+
+ public EndpointResult(Set<Endpoint> endpoints) {
+ this.endpoints = endpoints;
+ }
+
+ public Set<Endpoint> getEndpoints() {
+ return endpoints;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ EndpointResult that = (EndpointResult) o;
+ return Objects.equals(endpoints, that.endpoints);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(endpoints);
+ }
+
+ @Override
+ public String toString() {
+ return "EndpointResult{" +
+ "endpoints=" + endpoints +
+ '}';
+ }
+}
diff --git a/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/message/ListenerResult.java b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/message/ListenerResult.java
new file mode 100644
index 0000000..f6d6a9c
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/message/ListenerResult.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.xds.util.protocol.message;
+
+import org.apache.dubbo.common.utils.ConcurrentHashSet;
+
+import java.util.Objects;
+import java.util.Set;
+
+public class ListenerResult {
+ private Set<String> routeConfigNames;
+
+ public ListenerResult() {
+ this.routeConfigNames = new ConcurrentHashSet<>();
+ }
+
+ public ListenerResult(Set<String> routeConfigNames) {
+ this.routeConfigNames = routeConfigNames;
+ }
+
+ public Set<String> getRouteConfigNames() {
+ return routeConfigNames;
+ }
+
+ public void setRouteConfigNames(Set<String> routeConfigNames) {
+ this.routeConfigNames = routeConfigNames;
+ }
+
+ public void mergeRouteConfigNames(Set<String> names) {
+ this.routeConfigNames.addAll(names);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ListenerResult listenerResult = (ListenerResult) o;
+ return Objects.equals(routeConfigNames, listenerResult.routeConfigNames);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(routeConfigNames);
+ }
+
+ @Override
+ public String toString() {
+ return "ListenerResult{" +
+ "routeConfigNames=" + routeConfigNames +
+ '}';
+ }
+}
diff --git a/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/message/RouteResult.java b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/message/RouteResult.java
new file mode 100644
index 0000000..3aac915
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/message/RouteResult.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.xds.util.protocol.message;
+
+import org.apache.dubbo.common.utils.ConcurrentHashSet;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class RouteResult {
+ private final Map<String, Set<String>> domainMap;
+
+ public RouteResult() {
+ this.domainMap = new ConcurrentHashMap<>();
+ }
+
+ public RouteResult(Map<String, Set<String>> domainMap) {
+ this.domainMap = domainMap;
+ }
+
+ public boolean isNotEmpty() {
+ return !domainMap.isEmpty();
+ }
+
+ public Set<String> searchDomain(String domain) {
+ return domainMap.getOrDefault(domain, new ConcurrentHashSet<>());
+ }
+
+ public Set<String> getDomains() {
+ return Collections.unmodifiableSet(domainMap.keySet());
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ RouteResult that = (RouteResult) o;
+ return Objects.equals(domainMap, that.domainMap);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(domainMap);
+ }
+
+ @Override
+ public String toString() {
+ return "RouteResult{" +
+ "domainMap=" + domainMap +
+ '}';
+ }
+}
diff --git a/dubbo-registry/dubbo-registry-xds/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.RegistryFactory b/dubbo-registry/dubbo-registry-xds/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.RegistryFactory
new file mode 100644
index 0000000..453d3d3
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-xds/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.RegistryFactory
@@ -0,0 +1 @@
+xds=org.apache.dubbo.registry.xds.XdsRegistryFactory
\ No newline at end of file
diff --git a/dubbo-registry/dubbo-registry-xds/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.ServiceDiscovery b/dubbo-registry/dubbo-registry-xds/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.ServiceDiscovery
new file mode 100644
index 0000000..6fe7bd6
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-xds/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.ServiceDiscovery
@@ -0,0 +1 @@
+xds=org.apache.dubbo.registry.xds.XdsServiceDiscovery
\ No newline at end of file
diff --git a/dubbo-registry/dubbo-registry-xds/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.ServiceDiscoveryFactory b/dubbo-registry/dubbo-registry-xds/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.ServiceDiscoveryFactory
new file mode 100644
index 0000000..0335fa1
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-xds/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.ServiceDiscoveryFactory
@@ -0,0 +1 @@
+xds=org.apache.dubbo.registry.xds.XdsServiceDiscoveryFactory
\ No newline at end of file
diff --git a/dubbo-registry/pom.xml b/dubbo-registry/pom.xml
index db3ee35..a652eb8 100644
--- a/dubbo-registry/pom.xml
+++ b/dubbo-registry/pom.xml
@@ -37,5 +37,6 @@
<module>dubbo-registry-multiple</module>
<module>dubbo-registry-kubernetes</module>
<module>dubbo-registry-dns</module>
+ <module>dubbo-registry-xds</module>
</modules>
</project>