You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by al...@apache.org on 2021/01/23 08:12:51 UTC

[dubbo] branch 3.0 updated: Add basic framework for xDS Registry (#7112)

This is an automated email from the ASF dual-hosted git repository.

albumenj pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new b0043c3  Add basic framework for xDS Registry (#7112)
b0043c3 is described below

commit b0043c3f5d5aa8eb4b41ea8d4f6db36255a52ef1
Author: Albumen Kevin <jh...@gmail.com>
AuthorDate: Sat Jan 23 16:12:21 2021 +0800

    Add basic framework for xDS Registry (#7112)
    
    * introduce basic xds registry
    
    * complete protocol develop
    
    * add xds dependency to project level
    
    * Finish self-host metadata fetch
    
    * add comment
    
    * cleanup pom
---
 dubbo-dependencies-bom/pom.xml                     |   6 +
 dubbo-distribution/dubbo-all/pom.xml               |   8 +
 dubbo-distribution/dubbo-bom/pom.xml               |   5 +
 .../client/SelfHostMetaServiceDiscovery.java       |   3 +-
 dubbo-registry/dubbo-registry-xds/pom.xml          |  68 ++++++
 .../org/apache/dubbo/registry/xds/XdsRegistry.java |  58 +++++
 .../dubbo/registry/xds/XdsRegistryFactory.java     |  34 +++
 .../dubbo/registry/xds/XdsServiceDiscovery.java    |  78 ++++++
 .../registry/xds/XdsServiceDiscoveryFactory.java   |  28 +++
 .../dubbo/registry/xds/util/NodeBuilder.java       |  29 +++
 .../dubbo/registry/xds/util/PilotExchanger.java    | 146 ++++++++++++
 .../apache/dubbo/registry/xds/util/XdsChannel.java |  58 +++++
 .../xds/util/protocol/AbstractProtocol.java        | 262 +++++++++++++++++++++
 .../registry/xds/util/protocol/DeltaResource.java  |  32 +++
 .../registry/xds/util/protocol/XdsProtocol.java    |  48 ++++
 .../xds/util/protocol/delta/DeltaEndpoint.java     |  51 ++++
 .../xds/util/protocol/delta/DeltaListener.java     |  49 ++++
 .../xds/util/protocol/delta/DeltaRoute.java        |  47 ++++
 .../xds/util/protocol/impl/EdsProtocol.java        | 114 +++++++++
 .../xds/util/protocol/impl/LdsProtocol.java        | 123 ++++++++++
 .../xds/util/protocol/impl/RdsProtocol.java        | 115 +++++++++
 .../xds/util/protocol/message/Endpoint.java        |  88 +++++++
 .../xds/util/protocol/message/EndpointResult.java  |  62 +++++
 .../xds/util/protocol/message/ListenerResult.java  |  70 ++++++
 .../xds/util/protocol/message/RouteResult.java     |  73 ++++++
 .../org.apache.dubbo.registry.RegistryFactory      |   1 +
 ...g.apache.dubbo.registry.client.ServiceDiscovery |   1 +
 ...e.dubbo.registry.client.ServiceDiscoveryFactory |   1 +
 dubbo-registry/pom.xml                             |   1 +
 29 files changed, 1658 insertions(+), 1 deletion(-)

diff --git a/dubbo-dependencies-bom/pom.xml b/dubbo-dependencies-bom/pom.xml
index 8d84b38..f409ae8 100644
--- a/dubbo-dependencies-bom/pom.xml
+++ b/dubbo-dependencies-bom/pom.xml
@@ -125,6 +125,7 @@
         <snakeyaml_version>1.20</snakeyaml_version>
         <commons_lang3_version>3.8.1</commons_lang3_version>
         <protostuff_version>1.5.9</protostuff_version>
+        <envoy_api_version>0.1.23</envoy_api_version>
 
         <rs_api_version>2.0</rs_api_version>
         <resteasy_version>3.0.19.Final</resteasy_version>
@@ -533,6 +534,11 @@
                 <artifactId>commons-lang3</artifactId>
                 <version>${commons_lang3_version}</version>
             </dependency>
+            <dependency>
+                <groupId>io.envoyproxy.controlplane</groupId>
+                <artifactId>api</artifactId>
+                <version>${envoy_api_version}</version>
+            </dependency>
 
             <!-- for dubbo-rpc-webservice -->
             <dependency>
diff --git a/dubbo-distribution/dubbo-all/pom.xml b/dubbo-distribution/dubbo-all/pom.xml
index e8e4406..6b009b4 100644
--- a/dubbo-distribution/dubbo-all/pom.xml
+++ b/dubbo-distribution/dubbo-all/pom.xml
@@ -173,6 +173,13 @@
         </dependency>
         <dependency>
             <groupId>org.apache.dubbo</groupId>
+            <artifactId>dubbo-registry-xds</artifactId>
+            <version>${project.version}</version>
+            <scope>compile</scope>
+            <optional>true</optional>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.dubbo</groupId>
             <artifactId>dubbo-monitor-api</artifactId>
             <version>${project.version}</version>
             <scope>compile</scope>
@@ -374,6 +381,7 @@
                                     <include>org.apache.dubbo:dubbo-registry-multiple</include>
                                     <include>org.apache.dubbo:dubbo-registry-kubernetes</include>
                                     <include>org.apache.dubbo:dubbo-registry-dns</include>
+                                    <include>org.apache.dubbo:dubbo-registry-xds</include>
                                     <include>org.apache.dubbo:dubbo-monitor-api</include>
                                     <include>org.apache.dubbo:dubbo-monitor-default</include>
                                     <include>org.apache.dubbo:dubbo-container-api</include>
diff --git a/dubbo-distribution/dubbo-bom/pom.xml b/dubbo-distribution/dubbo-bom/pom.xml
index d3c1b33..5826d10 100644
--- a/dubbo-distribution/dubbo-bom/pom.xml
+++ b/dubbo-distribution/dubbo-bom/pom.xml
@@ -135,6 +135,11 @@
             </dependency>
             <dependency>
                 <groupId>org.apache.dubbo</groupId>
+                <artifactId>dubbo-registry-xds</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.dubbo</groupId>
                 <artifactId>dubbo-monitor-api</artifactId>
                 <version>${project.version}</version>
             </dependency>
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/SelfHostMetaServiceDiscovery.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/SelfHostMetaServiceDiscovery.java
index 25ab77e..1034420 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/SelfHostMetaServiceDiscovery.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/SelfHostMetaServiceDiscovery.java
@@ -240,7 +240,8 @@ public abstract class SelfHostMetaServiceDiscovery implements ServiceDiscovery {
                 serviceInstanceRevisionMap.put(serviceName, serviceInstanceRevision));
 
         if (logger.isDebugEnabled()) {
-            logger.debug("Poll DNS data. Service Instance changed: " + changed + " Service Name: " + serviceName);
+            logger.debug("Service changed event received (possibly because of DNS polling). " +
+                    "Service Instance changed: " + changed + " Service Name: " + serviceName);
         }
 
         if (changed) {
diff --git a/dubbo-registry/dubbo-registry-xds/pom.xml b/dubbo-registry/dubbo-registry-xds/pom.xml
new file mode 100644
index 0000000..effda5a
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-xds/pom.xml
@@ -0,0 +1,68 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>org.apache.dubbo</groupId>
+        <artifactId>dubbo-registry</artifactId>
+        <version>${revision}</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>dubbo-registry-xds</artifactId>
+    <name>${project.artifactId}</name>
+    <description>The Xds registry module of Dubbo project</description>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>org.apache.dubbo</groupId>
+            <artifactId>dubbo-registry-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.dubbo</groupId>
+            <artifactId>dubbo-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>io.grpc</groupId>
+            <artifactId>grpc-protobuf</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>io.grpc</groupId>
+            <artifactId>grpc-stub</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>io.envoyproxy.controlplane</groupId>
+            <artifactId>api</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>com.google.protobuf</groupId>
+            <artifactId>protobuf-java-util</artifactId>
+        </dependency>
+
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/XdsRegistry.java b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/XdsRegistry.java
new file mode 100644
index 0000000..a4bd51e
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/XdsRegistry.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.xds;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.registry.NotifyListener;
+import org.apache.dubbo.registry.support.FailbackRegistry;
+
+/**
+ * Empty implements for xDS <br/>
+ * xDS only support `Service Discovery` mode register <br/>
+ * Used to compat past version like 2.6.x, 2.7.x with interface level register <br/>
+ * {@link XdsServiceDiscovery} is the real implementation of xDS
+ */
+public class XdsRegistry extends FailbackRegistry {
+    public XdsRegistry(URL url) {
+        super(url);
+    }
+
+    @Override
+    public boolean isAvailable() {
+        return true;
+    }
+
+    @Override
+    public void doRegister(URL url) {
+
+    }
+
+    @Override
+    public void doUnregister(URL url) {
+
+    }
+
+    @Override
+    public void doSubscribe(URL url, NotifyListener listener) {
+
+    }
+
+    @Override
+    public void doUnsubscribe(URL url, NotifyListener listener) {
+
+    }
+}
diff --git a/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/XdsRegistryFactory.java b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/XdsRegistryFactory.java
new file mode 100644
index 0000000..8fa130b
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/XdsRegistryFactory.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.xds;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.registry.Registry;
+import org.apache.dubbo.registry.support.AbstractRegistryFactory;
+
+public class XdsRegistryFactory extends AbstractRegistryFactory {
+
+    @Override
+    protected String createRegistryCacheKey(URL url) {
+        return url.toFullString();
+    }
+
+    @Override
+    protected Registry createRegistry(URL url) {
+        return new XdsRegistry(url);
+    }
+}
diff --git a/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/XdsServiceDiscovery.java b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/XdsServiceDiscovery.java
new file mode 100644
index 0000000..0651e3e
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/XdsServiceDiscovery.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.xds;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.registry.client.DefaultServiceInstance;
+import org.apache.dubbo.registry.client.SelfHostMetaServiceDiscovery;
+import org.apache.dubbo.registry.client.ServiceInstance;
+import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener;
+import org.apache.dubbo.registry.xds.util.PilotExchanger;
+import org.apache.dubbo.registry.xds.util.protocol.message.Endpoint;
+
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
+public class XdsServiceDiscovery extends SelfHostMetaServiceDiscovery {
+    private PilotExchanger exchanger;
+    private URL registryURL;
+
+    @Override
+    public void doInitialize(URL registryURL) throws Exception {
+        exchanger = PilotExchanger.initialize(registryURL);
+    }
+
+    @Override
+    public void doDestroy() throws Exception {
+        exchanger.destroy();
+    }
+
+    @Override
+    public Set<String> getServices() {
+        return exchanger.getServices();
+    }
+
+    @Override
+    public List<ServiceInstance> getInstances(String serviceName) throws NullPointerException {
+        Set<Endpoint> endpoints = exchanger.getEndpoints(serviceName);
+        return changedToInstances(serviceName, endpoints);
+    }
+
+    @Override
+    public void addServiceInstancesChangedListener(ServiceInstancesChangedListener listener) throws NullPointerException, IllegalArgumentException {
+        listener.getServiceNames().forEach(serviceName -> {
+            exchanger.observeEndpoints(serviceName, (endpoints -> {
+                notifyListener(serviceName, listener, changedToInstances(serviceName, endpoints));
+            }));
+        });
+    }
+
+    private List<ServiceInstance> changedToInstances(String serviceName, Collection<Endpoint> endpoints) {
+        List<ServiceInstance> instances = new LinkedList<>();
+        endpoints.forEach(endpoint -> {
+            DefaultServiceInstance serviceInstance = new DefaultServiceInstance(serviceName, endpoint.getAddress(), endpoint.getPortValue());
+            // fill metadata by SelfHostMetaServiceDiscovery, will be fetched by RPC request
+            fillServiceInstance(serviceInstance);
+            instances.add(serviceInstance);
+        });
+        instances.sort(Comparator.comparingInt(ServiceInstance::hashCode));
+        return instances;
+    }
+}
diff --git a/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/XdsServiceDiscoveryFactory.java b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/XdsServiceDiscoveryFactory.java
new file mode 100644
index 0000000..ef77ccb
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/XdsServiceDiscoveryFactory.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.xds;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.registry.client.AbstractServiceDiscoveryFactory;
+import org.apache.dubbo.registry.client.ServiceDiscovery;
+
+public class XdsServiceDiscoveryFactory extends AbstractServiceDiscoveryFactory {
+    @Override
+    protected ServiceDiscovery createDiscovery(URL registryURL) {
+        return new XdsServiceDiscovery();
+    }
+}
diff --git a/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/NodeBuilder.java b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/NodeBuilder.java
new file mode 100644
index 0000000..fad0dbf
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/NodeBuilder.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.xds.util;
+
+import io.envoyproxy.envoy.config.core.v3.Node;
+
+public class NodeBuilder {
+    public static Node build() {
+        // TODO: fetch data from environment
+        return Node.newBuilder()
+                .setId("sidecar~127.0.0.1~ratings-v1-7dc98c7588-lwvqd.default~default.svc.cluster.local")
+                .setCluster("ratings.default")
+                .build();
+    }
+}
diff --git a/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/PilotExchanger.java b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/PilotExchanger.java
new file mode 100644
index 0000000..3d49bc1
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/PilotExchanger.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.xds.util;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.utils.CollectionUtils;
+import org.apache.dubbo.common.utils.ConcurrentHashSet;
+import org.apache.dubbo.registry.xds.util.protocol.impl.EdsProtocol;
+import org.apache.dubbo.registry.xds.util.protocol.impl.LdsProtocol;
+import org.apache.dubbo.registry.xds.util.protocol.impl.RdsProtocol;
+import org.apache.dubbo.registry.xds.util.protocol.message.Endpoint;
+import org.apache.dubbo.registry.xds.util.protocol.message.EndpointResult;
+import org.apache.dubbo.registry.xds.util.protocol.message.ListenerResult;
+import org.apache.dubbo.registry.xds.util.protocol.message.RouteResult;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
+
+public class PilotExchanger {
+
+    private final XdsChannel xdsChannel;
+
+    private final RdsProtocol rdsProtocol;
+
+    private final EdsProtocol edsProtocol;
+
+    private ListenerResult listenerResult;
+
+    private RouteResult routeResult;
+
+    private final long observeRouteRequest;
+
+    private final Map<String, Long> domainObserveRequest = new ConcurrentHashMap<>();
+
+    private final Map<String, Set<Consumer<Set<Endpoint>>>> domainObserveConsumer = new ConcurrentHashMap<>();
+
+    private PilotExchanger(URL url) {
+        xdsChannel = new XdsChannel(url);
+        LdsProtocol ldsProtocol = new LdsProtocol(xdsChannel, NodeBuilder.build());
+        this.rdsProtocol = new RdsProtocol(xdsChannel, NodeBuilder.build());
+        this.edsProtocol = new EdsProtocol(xdsChannel, NodeBuilder.build());
+
+        this.listenerResult = ldsProtocol.getListeners();
+        this.routeResult = rdsProtocol.getResource(listenerResult.getRouteConfigNames());
+
+        // Observer RDS update
+        this.observeRouteRequest = rdsProtocol.observeResource(listenerResult.getRouteConfigNames(), (newResult) -> {
+            // check if observed domain update ( will update endpoint observation )
+            domainObserveConsumer.forEach((domain, consumer) -> {
+                Set<String> newRoute = newResult.searchDomain(domain);
+                if (!routeResult.searchDomain(domain).equals(newRoute)) {
+                    // routers in observed domain has been updated
+                    Long domainRequest = domainObserveRequest.get(domain);
+                    if (domainRequest == null) {
+                        // router list is empty when observeEndpoints() called and domainRequest has not been created yet
+                        // create new observation
+                        doObserveEndpoints(domain);
+                    } else {
+                        // update observation by domainRequest
+                        edsProtocol.updateObserve(domainRequest, newRoute);
+                    }
+                }
+            });
+            // update local cache
+            routeResult = newResult;
+        });
+
+        // Observe LDS updated
+        ldsProtocol.observeListeners((newListener) -> {
+            // update local cache
+            this.listenerResult = newListener;
+            // update RDS observation
+            rdsProtocol.updateObserve(observeRouteRequest, newListener.getRouteConfigNames());
+        });
+    }
+
+    public static PilotExchanger initialize(URL url) {
+        return new PilotExchanger(url);
+    }
+
+    public void destroy() {
+        xdsChannel.destroy();
+    }
+
+    public Set<String> getServices() {
+        return routeResult.getDomains();
+    }
+
+    public Set<Endpoint> getEndpoints(String domain) {
+        Set<String> cluster = routeResult.searchDomain(domain);
+        if (CollectionUtils.isNotEmpty(cluster)) {
+            EndpointResult endpoint = edsProtocol.getResource(cluster);
+            return endpoint.getEndpoints();
+        } else {
+            return Collections.emptySet();
+        }
+    }
+
+    public void observeEndpoints(String domain, Consumer<Set<Endpoint>> consumer) {
+        // store Consumer
+        domainObserveConsumer.compute(domain, (k, v) -> {
+            if (v == null) {
+                v = new ConcurrentHashSet<>();
+            }
+            // support multi-consumer
+            v.add(consumer);
+            return v;
+        });
+        if (!domainObserveRequest.containsKey(domain)) {
+            doObserveEndpoints(domain);
+        }
+    }
+
+    private void doObserveEndpoints(String domain) {
+        Set<String> router = routeResult.searchDomain(domain);
+        // if router is empty, do nothing
+        // observation will be created when RDS updates
+        if (CollectionUtils.isNotEmpty(router)) {
+            long endpointRequest =
+                    edsProtocol.observeResource(
+                            router,
+                            endpointResult ->
+                                    // notify consumers
+                                    domainObserveConsumer.get(domain).forEach(
+                                            consumer1 -> consumer1.accept(endpointResult.getEndpoints())));
+            domainObserveRequest.put(domain, endpointRequest);
+        }
+    }
+}
diff --git a/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/XdsChannel.java b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/XdsChannel.java
new file mode 100644
index 0000000..5336378
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/XdsChannel.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.xds.util;
+
+import org.apache.dubbo.common.URL;
+
+import io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc;
+import io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryRequest;
+import io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryResponse;
+import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest;
+import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.stub.StreamObserver;
+
+public class XdsChannel {
+    private final ManagedChannel channel;
+
+    protected XdsChannel(URL url) {
+        channel = ManagedChannelBuilder.forAddress(url.getHost(), url.getPort())
+                .usePlaintext()
+                .build();
+    }
+
+    public StreamObserver<DeltaDiscoveryRequest> observeDeltaDiscoveryRequest(StreamObserver<DeltaDiscoveryResponse> observer) {
+        return AggregatedDiscoveryServiceGrpc.newStub(channel).deltaAggregatedResources(observer);
+    }
+
+    public StreamObserver<DiscoveryRequest> createDeltaDiscoveryRequest(StreamObserver<DiscoveryResponse> observer) {
+        return AggregatedDiscoveryServiceGrpc.newStub(channel).streamAggregatedResources(observer);
+    }
+
+    public StreamObserver<io.envoyproxy.envoy.api.v2.DeltaDiscoveryRequest> observeDeltaDiscoveryRequestV2(StreamObserver<io.envoyproxy.envoy.api.v2.DeltaDiscoveryResponse> observer) {
+        return io.envoyproxy.envoy.service.discovery.v2.AggregatedDiscoveryServiceGrpc.newStub(channel).deltaAggregatedResources(observer);
+    }
+
+    public StreamObserver<io.envoyproxy.envoy.api.v2.DiscoveryRequest> createDeltaDiscoveryRequestV2(StreamObserver<io.envoyproxy.envoy.api.v2.DiscoveryResponse> observer) {
+        return io.envoyproxy.envoy.service.discovery.v2.AggregatedDiscoveryServiceGrpc.newStub(channel).streamAggregatedResources(observer);
+    }
+
+    public void destroy() {
+        channel.shutdown();
+    }
+}
diff --git a/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/AbstractProtocol.java b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/AbstractProtocol.java
new file mode 100644
index 0000000..13ac5be
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/AbstractProtocol.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.xds.util.protocol;
+
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.registry.xds.util.XdsChannel;
+
+import io.envoyproxy.envoy.config.core.v3.Node;
+import io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryRequest;
+import io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryResponse;
+import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest;
+import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse;
+import io.grpc.stub.StreamObserver;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+
+public abstract class AbstractProtocol<T, S extends DeltaResource<T>> implements XdsProtocol<T>{
+
+    private static final Logger logger = LoggerFactory.getLogger(AbstractProtocol.class);
+
+    protected final XdsChannel xdsChannel;
+
+    protected final Node node;
+
+    /**
+     * Store Request Parameter ( resourceNames )
+     * K - requestId, V - resourceNames
+     */
+    protected final Map<Long, Set<String>> requestParam = new ConcurrentHashMap<>();
+
+    /**
+     * Store ADS Request Observer ( StreamObserver in Streaming Request )
+     * K - requestId, V - StreamObserver
+     */
+    private final Map<Long, StreamObserver<DiscoveryRequest>> requestObserverMap = new ConcurrentHashMap<>();
+
+    /**
+     * Store Delta-ADS Request Observer ( StreamObserver in Streaming Request )
+     * K - requestId, V - StreamObserver
+     */
+    private final Map<Long, StreamObserver<DeltaDiscoveryRequest>> deltaRequestObserverMap = new ConcurrentHashMap<>();
+
+    /**
+     * Store CompletableFuture for Request ( used to fetch async result in ResponseObserver )
+     * K - requestId, V - CompletableFuture
+     */
+    private final Map<Long, CompletableFuture<T>> streamResult = new ConcurrentHashMap<>();
+
+    /**
+     * Store consumers for Observers ( will consume message produced by Delta-ADS )
+     * K - requestId, V - Consumer
+     */
+    private final Map<Long, Consumer<T>> consumers = new ConcurrentHashMap<>();
+
+    protected final AtomicLong requestId = new AtomicLong(0);
+
+    public AbstractProtocol(XdsChannel xdsChannel, Node node) {
+        this.xdsChannel = xdsChannel;
+        this.node = node;
+    }
+
+    /**
+     * Abstract method to obtain Type-URL from sub-class
+     *
+     * @return Type-URL of xDS
+     */
+    public abstract String getTypeUrl();
+
+    @Override
+    public T getResource(Set<String> resourceNames) {
+        long request = requestId.getAndIncrement();
+
+        // Store Request Parameter, which will be used for ACK
+        requestParam.put(request, resourceNames);
+
+        // create observer
+        StreamObserver<DiscoveryRequest> requestObserver = xdsChannel.createDeltaDiscoveryRequest(new ResponseObserver(request));
+
+        // use future to get async result
+        CompletableFuture<T> future = new CompletableFuture<>();
+        requestObserverMap.put(request, requestObserver);
+        streamResult.put(request, future);
+
+        // send request to control panel
+        requestObserver.onNext(buildDiscoveryRequest(resourceNames));
+
+        try {
+            // get result
+            return future.get();
+        } catch (InterruptedException | ExecutionException e) {
+            logger.error("Error occur when request control panel.");
+            return null;
+        } finally {
+            // close observer
+            requestObserver.onCompleted();
+
+            // remove temp
+            streamResult.remove(request);
+            requestObserverMap.remove(request);
+            requestParam.remove(request);
+        }
+    }
+
+    @Override
+    public long observeResource(Set<String> resourceNames, Consumer<T> consumer) {
+        long request = requestId.getAndIncrement();
+
+        // Store Request Parameter, which will be used for ACK
+        requestParam.put(request, resourceNames);
+
+        // call once for full data
+        consumer.accept(getResource(resourceNames));
+
+        consumers.put(request, consumer);
+        deltaRequestObserverMap.compute(request, (k, v) -> {
+            // create Delta-ADS observer
+            v= xdsChannel.observeDeltaDiscoveryRequest(new DeltaResponseObserver(request));
+
+            // send observe request
+            v.onNext(buildDeltaDiscoveryRequest(resourceNames));
+            return v;
+        });
+        return request;
+    }
+
+    @Override
+    public void updateObserve(long request, Set<String> resourceNames) {
+        // send difference in resourceNames
+        deltaRequestObserverMap.get(request).onNext(buildDeltaDiscoveryRequest(request, resourceNames));
+    }
+
+    protected DiscoveryRequest buildDiscoveryRequest(Set<String> resourceNames) {
+        return DiscoveryRequest.newBuilder()
+                .setNode(node)
+                .setTypeUrl(getTypeUrl())
+                .addAllResourceNames(resourceNames)
+                .build();
+    }
+
+    protected DiscoveryRequest buildDiscoveryRequest(Set<String> resourceNames, DiscoveryResponse response) {
+        // for ACK
+        return DiscoveryRequest.newBuilder()
+                .setNode(node)
+                .setTypeUrl(getTypeUrl())
+                .addAllResourceNames(resourceNames)
+                .setVersionInfo(response.getVersionInfo())
+                .setResponseNonce(response.getNonce())
+                .build();
+    }
+
+    protected DeltaDiscoveryRequest buildDeltaDiscoveryRequest(Set<String> resourceNames) {
+        return DeltaDiscoveryRequest.newBuilder()
+                .setNode(node)
+                .setTypeUrl(getTypeUrl())
+                .addAllResourceNamesSubscribe(resourceNames)
+                .build();
+    }
+
+    protected DeltaDiscoveryRequest buildDeltaDiscoveryRequest(long request, Set<String> resourceNames) {
+        // compare with previous
+        Set<String> previous = requestParam.get(request);
+        Set<String> unsubscribe = new HashSet<String>(previous) {{
+            removeAll(resourceNames);
+        }};
+        requestParam.put(request, resourceNames);
+        return DeltaDiscoveryRequest.newBuilder()
+                .setNode(node)
+                .setTypeUrl(getTypeUrl())
+                .addAllResourceNamesUnsubscribe(unsubscribe)
+                .addAllResourceNamesSubscribe(resourceNames)
+                .build();
+    }
+
+    private DeltaDiscoveryRequest buildDeltaDiscoveryRequest(Set<String> resourceNames, DeltaDiscoveryResponse response) {
+        // for ACK
+        return DeltaDiscoveryRequest.newBuilder()
+                .setNode(node)
+                .setTypeUrl(getTypeUrl())
+                .addAllResourceNamesSubscribe(resourceNames)
+                .setResponseNonce(response.getNonce())
+                .build();
+    }
+
+    protected abstract T decodeDiscoveryResponse(DiscoveryResponse response);
+
+    protected abstract S decodeDeltaDiscoveryResponse(DeltaDiscoveryResponse response, S previous);
+
+    private class ResponseObserver implements StreamObserver<DiscoveryResponse> {
+        private final long requestId;
+
+        public ResponseObserver(long requestId) {
+            this.requestId = requestId;
+        }
+
+        @Override
+        public void onNext(DiscoveryResponse value) {
+            T result = decodeDiscoveryResponse(value);
+            requestObserverMap.get(requestId).onNext(buildDiscoveryRequest(requestParam.get(requestId), value));
+            streamResult.get(requestId).complete(result);
+        }
+
+        @Override
+        public void onError(Throwable t) {
+            logger.error("xDS Client received error message! detail:", t);
+        }
+
+        @Override
+        public void onCompleted() {
+            // ignore
+        }
+    }
+
+    private class DeltaResponseObserver implements StreamObserver<DeltaDiscoveryResponse> {
+        private S delta = null;
+        private final long requestId;
+
+        public DeltaResponseObserver(long requestId) {
+            this.requestId = requestId;
+        }
+
+        @Override
+        public void onNext(DeltaDiscoveryResponse value) {
+            delta = decodeDeltaDiscoveryResponse(value, delta);
+            T routes = delta.getResource();
+            consumers.get(requestId).accept(routes);
+            deltaRequestObserverMap.get(requestId).onNext(buildDeltaDiscoveryRequest(requestParam.get(requestId), value));
+        }
+
+        @Override
+        public void onError(Throwable t) {
+            logger.error("xDS Client received error message! detail:", t);
+        }
+
+        @Override
+        public void onCompleted() {
+            // ignore
+        }
+    }
+
+}
diff --git a/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/DeltaResource.java b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/DeltaResource.java
new file mode 100644
index 0000000..bcae39c
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/DeltaResource.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.xds.util.protocol;
+
+/**
+ * A interface for resources in xDS, which can be updated by ADS delta stream
+ * <br/>
+ * This interface is design to unify the way of fetching data in delta stream
+ * in {@link org.apache.dubbo.registry.xds.util.PilotExchanger}
+ */
+public interface DeltaResource<T> {
+    /**
+     * Get resource from delta stream
+     *
+     * @return the newest resource from stream
+     */
+    T getResource();
+}
diff --git a/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/XdsProtocol.java b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/XdsProtocol.java
new file mode 100644
index 0000000..25ec3fd
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/XdsProtocol.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.xds.util.protocol;
+
+import java.util.Set;
+import java.util.function.Consumer;
+
+public interface XdsProtocol<T> {
+    /**
+     * Gets all {@link T resource} by the specified resource name.
+     * For LDS, the {@param resourceNames} is ignored
+     *
+     * @param resourceNames specified resource name
+     * @return resources, null if request failed
+     */
+    T getResource(Set<String> resourceNames);
+
+    /**
+     * Add a observer resource with {@link Consumer}
+     *
+     * @param resourceNames specified resource name
+     * @param consumer resource notifier, will be called when resource updated
+     * @return requestId, used when resourceNames update with {@link XdsProtocol#updateObserve(long, Set)}
+     */
+    long observeResource(Set<String> resourceNames, Consumer<T> consumer);
+
+    /**
+     * Update observed resource list in {@link XdsProtocol#observeResource(Set, Consumer)}
+     *
+     * @param request requestId returned by {@link XdsProtocol#observeResource(Set, Consumer)}
+     * @param resourceNames new resource name list to observe
+     */
+    void updateObserve(long request, Set<String> resourceNames);
+}
diff --git a/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/delta/DeltaEndpoint.java b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/delta/DeltaEndpoint.java
new file mode 100644
index 0000000..16823aa
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/delta/DeltaEndpoint.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.xds.util.protocol.delta;
+
+import org.apache.dubbo.common.utils.CollectionUtils;
+import org.apache.dubbo.registry.xds.util.protocol.DeltaResource;
+import org.apache.dubbo.registry.xds.util.protocol.message.Endpoint;
+import org.apache.dubbo.registry.xds.util.protocol.message.EndpointResult;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+public class DeltaEndpoint implements DeltaResource<EndpointResult> {
+    private final Map<String, Set<Endpoint>> data = new ConcurrentHashMap<>();
+
+    public void addResource(String resourceName, Set<Endpoint> endpoints) {
+        data.put(resourceName, endpoints);
+    }
+
+    public void removeResource(Collection<String> resourceName) {
+        if (CollectionUtils.isNotEmpty(resourceName)) {
+            resourceName.forEach(data::remove);
+        }
+    }
+
+    @Override
+    public EndpointResult getResource() {
+        Set<Endpoint> set = data.values().stream()
+                .flatMap(Set::stream)
+                .collect(Collectors.toSet());
+        return new EndpointResult(set);
+    }
+
+}
diff --git a/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/delta/DeltaListener.java b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/delta/DeltaListener.java
new file mode 100644
index 0000000..8b66217
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/delta/DeltaListener.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.xds.util.protocol.delta;
+
+import org.apache.dubbo.common.utils.CollectionUtils;
+import org.apache.dubbo.registry.xds.util.protocol.DeltaResource;
+import org.apache.dubbo.registry.xds.util.protocol.message.ListenerResult;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+public class DeltaListener implements DeltaResource<ListenerResult> {
+    private final Map<String, Set<String>> data = new ConcurrentHashMap<>();
+
+    public void addResource(String resourceName, Set<String> listeners) {
+        data.put(resourceName, listeners);
+    }
+
+    public void removeResource(Collection<String> resourceName) {
+        if (CollectionUtils.isNotEmpty(resourceName)) {
+            resourceName.forEach(data::remove);
+        }
+    }
+
+    @Override
+    public ListenerResult getResource() {
+        Set<String> set = data.values().stream()
+                .flatMap(Set::stream)
+                .collect(Collectors.toSet());
+        return new ListenerResult(set);
+    }
+}
diff --git a/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/delta/DeltaRoute.java b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/delta/DeltaRoute.java
new file mode 100644
index 0000000..71fdb47
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/delta/DeltaRoute.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.xds.util.protocol.delta;
+
+import org.apache.dubbo.common.utils.CollectionUtils;
+import org.apache.dubbo.registry.xds.util.protocol.DeltaResource;
+import org.apache.dubbo.registry.xds.util.protocol.message.RouteResult;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class DeltaRoute implements DeltaResource<RouteResult> {
+    private final Map<String, Map<String, Set<String>>> data = new ConcurrentHashMap<>();
+
+    public void addResource(String resourceName, Map<String, Set<String>> route) {
+        data.put(resourceName, route);
+    }
+
+    public void removeResource(Collection<String> resourceName) {
+        if (CollectionUtils.isNotEmpty(resourceName)) {
+            resourceName.forEach(data::remove);
+        }
+    }
+
+    @Override
+    public RouteResult getResource() {
+        Map<String, Set<String>> result = new ConcurrentHashMap<>();
+        data.values().forEach(result::putAll);
+        return new RouteResult(result);
+    }
+}
diff --git a/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/impl/EdsProtocol.java b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/impl/EdsProtocol.java
new file mode 100644
index 0000000..7476c00
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/impl/EdsProtocol.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.xds.util.protocol.impl;
+
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.registry.xds.util.XdsChannel;
+import org.apache.dubbo.registry.xds.util.protocol.AbstractProtocol;
+import org.apache.dubbo.registry.xds.util.protocol.delta.DeltaEndpoint;
+import org.apache.dubbo.registry.xds.util.protocol.message.Endpoint;
+import org.apache.dubbo.registry.xds.util.protocol.message.EndpointResult;
+
+import com.google.protobuf.Any;
+import com.google.protobuf.InvalidProtocolBufferException;
+import io.envoyproxy.envoy.config.core.v3.HealthStatus;
+import io.envoyproxy.envoy.config.core.v3.Node;
+import io.envoyproxy.envoy.config.core.v3.SocketAddress;
+import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment;
+import io.envoyproxy.envoy.config.endpoint.v3.LbEndpoint;
+import io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryResponse;
+import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse;
+import io.envoyproxy.envoy.service.discovery.v3.Resource;
+
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class EdsProtocol extends AbstractProtocol<EndpointResult, DeltaEndpoint> {
+
+    private static final Logger logger = LoggerFactory.getLogger(LdsProtocol.class);
+
+    public EdsProtocol(XdsChannel xdsChannel, Node node) {
+        super(xdsChannel, node);
+    }
+
+    @Override
+    public String getTypeUrl() {
+        return "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment";
+    }
+
+    @Override
+    protected  EndpointResult decodeDiscoveryResponse(DiscoveryResponse response) {
+        if (getTypeUrl().equals(response.getTypeUrl())) {
+            Set<Endpoint> set = response.getResourcesList().stream()
+                    .map(EdsProtocol::unpackClusterLoadAssignment)
+                    .filter(Objects::nonNull)
+                    .flatMap((e) -> decodeResourceToEndpoint(e).stream())
+                    .collect(Collectors.toSet());
+            return new EndpointResult(set);
+        }
+        return new EndpointResult();
+    }
+
+    @Override
+    protected  DeltaEndpoint decodeDeltaDiscoveryResponse(DeltaDiscoveryResponse response, DeltaEndpoint previous) {
+        DeltaEndpoint deltaEndpoint = previous;
+        if (deltaEndpoint == null) {
+            deltaEndpoint = new DeltaEndpoint();
+        }
+        if (getTypeUrl().equals(response.getTypeUrl())) {
+            deltaEndpoint.removeResource(response.getRemovedResourcesList());
+            for (Resource resource : response.getResourcesList()) {
+                ClusterLoadAssignment unpackedResource = unpackClusterLoadAssignment(resource.getResource());
+                if (unpackedResource == null) {
+                    continue;
+                }
+                deltaEndpoint.addResource(resource.getName(), decodeResourceToEndpoint(unpackedResource));
+            }
+        }
+        return previous;
+    }
+
+    private static Set<Endpoint> decodeResourceToEndpoint(ClusterLoadAssignment resource) {
+        return resource.getEndpointsList().stream()
+                .flatMap((e) -> e.getLbEndpointsList().stream())
+                .map(EdsProtocol::decodeLbEndpointToEndpoint)
+                .collect(Collectors.toSet());
+    }
+
+    private static Endpoint decodeLbEndpointToEndpoint(LbEndpoint lbEndpoint) {
+        Endpoint endpoint = new Endpoint();
+        SocketAddress address = lbEndpoint.getEndpoint().getAddress().getSocketAddress();
+        endpoint.setAddress(address.getAddress());
+        endpoint.setPortValue(address.getPortValue());
+        boolean healthy = HealthStatus.HEALTHY.equals(lbEndpoint.getHealthStatus()) ||
+                HealthStatus.UNKNOWN.equals(lbEndpoint.getHealthStatus());
+        endpoint.setHealthy(healthy);
+        endpoint.setWeight(lbEndpoint.getLoadBalancingWeight().getValue());
+        return endpoint;
+    }
+
+    private static ClusterLoadAssignment unpackClusterLoadAssignment(Any any) {
+        try {
+            return any.unpack(ClusterLoadAssignment.class);
+        } catch (InvalidProtocolBufferException e) {
+            logger.error("Error occur when decode xDS response.", e);
+            return null;
+        }
+    }
+}
diff --git a/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/impl/LdsProtocol.java b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/impl/LdsProtocol.java
new file mode 100644
index 0000000..3116efa
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/impl/LdsProtocol.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.xds.util.protocol.impl;
+
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.registry.xds.util.XdsChannel;
+import org.apache.dubbo.registry.xds.util.protocol.AbstractProtocol;
+import org.apache.dubbo.registry.xds.util.protocol.delta.DeltaListener;
+import org.apache.dubbo.registry.xds.util.protocol.message.ListenerResult;
+
+import com.google.protobuf.Any;
+import com.google.protobuf.InvalidProtocolBufferException;
+import io.envoyproxy.envoy.config.core.v3.Node;
+import io.envoyproxy.envoy.config.listener.v3.Filter;
+import io.envoyproxy.envoy.config.listener.v3.Listener;
+import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager;
+import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.Rds;
+import io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryResponse;
+import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse;
+import io.envoyproxy.envoy.service.discovery.v3.Resource;
+
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+public class LdsProtocol extends AbstractProtocol<ListenerResult, DeltaListener> {
+
+    private static final Logger logger = LoggerFactory.getLogger(LdsProtocol.class);
+
+    public LdsProtocol(XdsChannel xdsChannel, Node node) {
+        super(xdsChannel, node);
+    }
+
+    @Override
+    public String getTypeUrl() {
+        return "type.googleapis.com/envoy.config.listener.v3.Listener";
+    }
+
+    public ListenerResult getListeners() {
+        return getResource(null);
+    }
+
+    public void observeListeners(Consumer<ListenerResult> consumer) {
+        observeResource(null,consumer);
+    }
+
+    @Override
+    protected ListenerResult decodeDiscoveryResponse(DiscoveryResponse response) {
+        if (getTypeUrl().equals(response.getTypeUrl())) {
+            Set<String> set = response.getResourcesList().stream()
+                    .map(LdsProtocol::unpackListener)
+                    .filter(Objects::nonNull)
+                    .flatMap((e) -> decodeResourceToListener(e).stream())
+                    .collect(Collectors.toSet());
+            return new ListenerResult(set);
+        }
+        return new ListenerResult();
+    }
+
+    @Override
+    protected DeltaListener decodeDeltaDiscoveryResponse(DeltaDiscoveryResponse response, DeltaListener previous) {
+        DeltaListener deltaListener = previous;
+        if (deltaListener == null) {
+            deltaListener = new DeltaListener();
+        }
+        if (getTypeUrl().equals(response.getTypeUrl())) {
+            deltaListener.removeResource(response.getRemovedResourcesList());
+            for (Resource resource : response.getResourcesList()) {
+                Listener unpackedResource = unpackListener(resource.getResource());
+                if (unpackedResource == null) {
+                    continue;
+                }
+                deltaListener.addResource(resource.getName(), decodeResourceToListener(unpackedResource));
+            }
+        }
+        return deltaListener;
+    }
+
+    private Set<String> decodeResourceToListener(Listener resource) {
+        return resource.getFilterChainsList().stream()
+                .flatMap((e) -> e.getFiltersList().stream())
+                .map(Filter::getTypedConfig)
+                .map(LdsProtocol::unpackHttpConnectionManager)
+                .filter(Objects::nonNull)
+                .map(HttpConnectionManager::getRds)
+                .map(Rds::getRouteConfigName)
+                .collect(Collectors.toSet());
+    }
+
+    private static Listener unpackListener(Any any) {
+        try {
+            return any.unpack(Listener.class);
+        } catch (InvalidProtocolBufferException e) {
+            logger.error("Error occur when decode xDS response.", e);
+            return null;
+        }
+    }
+
+    private static HttpConnectionManager unpackHttpConnectionManager(Any any) {
+        try {
+            return any.unpack(HttpConnectionManager.class);
+        } catch (InvalidProtocolBufferException e) {
+            logger.error("Error occur when decode xDS response.", e);
+            return null;
+        }
+    }
+}
diff --git a/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/impl/RdsProtocol.java b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/impl/RdsProtocol.java
new file mode 100644
index 0000000..a85b2ea
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/impl/RdsProtocol.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.xds.util.protocol.impl;
+
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.registry.xds.util.XdsChannel;
+import org.apache.dubbo.registry.xds.util.protocol.AbstractProtocol;
+import org.apache.dubbo.registry.xds.util.protocol.delta.DeltaRoute;
+import org.apache.dubbo.registry.xds.util.protocol.message.RouteResult;
+
+import com.google.protobuf.Any;
+import com.google.protobuf.InvalidProtocolBufferException;
+import io.envoyproxy.envoy.config.core.v3.Node;
+import io.envoyproxy.envoy.config.route.v3.Route;
+import io.envoyproxy.envoy.config.route.v3.RouteAction;
+import io.envoyproxy.envoy.config.route.v3.RouteConfiguration;
+import io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryResponse;
+import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse;
+import io.envoyproxy.envoy.service.discovery.v3.Resource;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class RdsProtocol extends AbstractProtocol<RouteResult, DeltaRoute> {
+
+    private static final Logger logger = LoggerFactory.getLogger(AbstractProtocol.class);
+
+    public RdsProtocol(XdsChannel xdsChannel, Node node) {
+        super(xdsChannel, node);
+    }
+
+    @Override
+    public String getTypeUrl() {
+        return "type.googleapis.com/envoy.config.route.v3.RouteConfiguration";
+    }
+
+    @Override
+    protected RouteResult decodeDiscoveryResponse(DiscoveryResponse response) {
+        if (getTypeUrl().equals(response.getTypeUrl())) {
+            Map<String, Set<String>> map = response.getResourcesList().stream()
+                    .map(RdsProtocol::unpackRouteConfiguration)
+                    .filter(Objects::nonNull)
+                    .map(RdsProtocol::decodeResourceToListener)
+                    .reduce((a, b) -> {
+                        a.putAll(b);
+                        return a;
+                    }).orElse(new HashMap<>());
+            return new RouteResult(map);
+        }
+        return new RouteResult();
+    }
+
+    @Override
+    protected DeltaRoute decodeDeltaDiscoveryResponse(DeltaDiscoveryResponse response, DeltaRoute previous) {
+        DeltaRoute deltaRoute = previous;
+        if (deltaRoute == null) {
+            deltaRoute = new DeltaRoute();
+        }
+        if (getTypeUrl().equals(response.getTypeUrl())) {
+            deltaRoute.removeResource(response.getRemovedResourcesList());
+            for (Resource resource : response.getResourcesList()) {
+                RouteConfiguration unpackedResource = unpackRouteConfiguration(resource.getResource());
+                if (unpackedResource == null) {
+                    continue;
+                }
+                deltaRoute.addResource(resource.getName(), decodeResourceToListener(unpackedResource));
+            }
+        }
+        return deltaRoute;
+    }
+
+    private static Map<String, Set<String>> decodeResourceToListener(RouteConfiguration resource) {
+        Map<String, Set<String>> map = new HashMap<>();
+        resource.getVirtualHostsList()
+                .forEach(virtualHost -> {
+                    Set<String> cluster = virtualHost.getRoutesList().stream()
+                            .map(Route::getRoute)
+                            .map(RouteAction::getCluster)
+                            .collect(Collectors.toSet());
+                    for (String domain : virtualHost.getDomainsList()) {
+                        map.put(domain, cluster);
+                    }
+                });
+        return map;
+    }
+
+    private static RouteConfiguration unpackRouteConfiguration(Any any) {
+        try {
+            return any.unpack(RouteConfiguration.class);
+        } catch (InvalidProtocolBufferException e) {
+            logger.error("Error occur when decode xDS response.", e);
+            return null;
+        }
+    }
+
+
+}
diff --git a/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/message/Endpoint.java b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/message/Endpoint.java
new file mode 100644
index 0000000..b966bda
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/message/Endpoint.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.xds.util.protocol.message;
+
+import java.util.Objects;
+
+public class Endpoint {
+    private String address;
+    private int portValue;
+    private boolean healthy;
+    private int weight;
+
+    public String getAddress() {
+        return address;
+    }
+
+    public void setAddress(String address) {
+        this.address = address;
+    }
+
+    public int getPortValue() {
+        return portValue;
+    }
+
+    public void setPortValue(int portValue) {
+        this.portValue = portValue;
+    }
+
+    public boolean isHealthy() {
+        return healthy;
+    }
+
+    public void setHealthy(boolean healthy) {
+        this.healthy = healthy;
+    }
+
+    public int getWeight() {
+        return weight;
+    }
+
+    public void setWeight(int weight) {
+        this.weight = weight;
+    }
+
+    @Override
+    public String toString() {
+        return "Endpoint{" +
+                "address='" + address + '\'' +
+                ", portValue='" + portValue + '\'' +
+                ", healthy=" + healthy +
+                ", weight=" + weight +
+                '}';
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        Endpoint endpoint = (Endpoint) o;
+        return healthy == endpoint.healthy &&
+                weight == endpoint.weight &&
+                Objects.equals(address, endpoint.address) &&
+                Objects.equals(portValue, endpoint.portValue);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(address, portValue, healthy, weight);
+    }
+}
diff --git a/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/message/EndpointResult.java b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/message/EndpointResult.java
new file mode 100644
index 0000000..fc75eee
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/message/EndpointResult.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.xds.util.protocol.message;
+
+import org.apache.dubbo.common.utils.ConcurrentHashSet;
+
+import java.util.Objects;
+import java.util.Set;
+
+public class EndpointResult {
+    private Set<Endpoint> endpoints;
+
+    public EndpointResult() {
+        this.endpoints = new ConcurrentHashSet<>();
+    }
+
+    public EndpointResult(Set<Endpoint> endpoints) {
+        this.endpoints = endpoints;
+    }
+
+    public Set<Endpoint> getEndpoints() {
+        return endpoints;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        EndpointResult that = (EndpointResult) o;
+        return Objects.equals(endpoints, that.endpoints);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(endpoints);
+    }
+
+    @Override
+    public String toString() {
+        return "EndpointResult{" +
+                "endpoints=" + endpoints +
+                '}';
+    }
+}
diff --git a/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/message/ListenerResult.java b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/message/ListenerResult.java
new file mode 100644
index 0000000..f6d6a9c
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/message/ListenerResult.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.xds.util.protocol.message;
+
+import org.apache.dubbo.common.utils.ConcurrentHashSet;
+
+import java.util.Objects;
+import java.util.Set;
+
+public class ListenerResult {
+    private Set<String> routeConfigNames;
+
+    public ListenerResult() {
+        this.routeConfigNames = new ConcurrentHashSet<>();
+    }
+
+    public ListenerResult(Set<String> routeConfigNames) {
+        this.routeConfigNames = routeConfigNames;
+    }
+
+    public Set<String> getRouteConfigNames() {
+        return routeConfigNames;
+    }
+
+    public void setRouteConfigNames(Set<String> routeConfigNames) {
+        this.routeConfigNames = routeConfigNames;
+    }
+
+    public void mergeRouteConfigNames(Set<String> names) {
+        this.routeConfigNames.addAll(names);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        ListenerResult listenerResult = (ListenerResult) o;
+        return Objects.equals(routeConfigNames, listenerResult.routeConfigNames);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(routeConfigNames);
+    }
+
+    @Override
+    public String toString() {
+        return "ListenerResult{" +
+                "routeConfigNames=" + routeConfigNames +
+                '}';
+    }
+}
diff --git a/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/message/RouteResult.java b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/message/RouteResult.java
new file mode 100644
index 0000000..3aac915
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/message/RouteResult.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.xds.util.protocol.message;
+
+import org.apache.dubbo.common.utils.ConcurrentHashSet;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class RouteResult {
+    private final Map<String, Set<String>> domainMap;
+
+    public RouteResult() {
+        this.domainMap = new ConcurrentHashMap<>();
+    }
+
+    public RouteResult(Map<String, Set<String>> domainMap) {
+        this.domainMap = domainMap;
+    }
+
+    public boolean isNotEmpty() {
+        return !domainMap.isEmpty();
+    }
+
+    public Set<String> searchDomain(String domain) {
+        return domainMap.getOrDefault(domain, new ConcurrentHashSet<>());
+    }
+
+    public Set<String> getDomains() {
+        return Collections.unmodifiableSet(domainMap.keySet());
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        RouteResult that = (RouteResult) o;
+        return Objects.equals(domainMap, that.domainMap);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(domainMap);
+    }
+
+    @Override
+    public String toString() {
+        return "RouteResult{" +
+                "domainMap=" + domainMap +
+                '}';
+    }
+}
diff --git a/dubbo-registry/dubbo-registry-xds/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.RegistryFactory b/dubbo-registry/dubbo-registry-xds/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.RegistryFactory
new file mode 100644
index 0000000..453d3d3
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-xds/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.RegistryFactory
@@ -0,0 +1 @@
+xds=org.apache.dubbo.registry.xds.XdsRegistryFactory
\ No newline at end of file
diff --git a/dubbo-registry/dubbo-registry-xds/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.ServiceDiscovery b/dubbo-registry/dubbo-registry-xds/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.ServiceDiscovery
new file mode 100644
index 0000000..6fe7bd6
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-xds/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.ServiceDiscovery
@@ -0,0 +1 @@
+xds=org.apache.dubbo.registry.xds.XdsServiceDiscovery
\ No newline at end of file
diff --git a/dubbo-registry/dubbo-registry-xds/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.ServiceDiscoveryFactory b/dubbo-registry/dubbo-registry-xds/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.ServiceDiscoveryFactory
new file mode 100644
index 0000000..0335fa1
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-xds/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.ServiceDiscoveryFactory
@@ -0,0 +1 @@
+xds=org.apache.dubbo.registry.xds.XdsServiceDiscoveryFactory
\ No newline at end of file
diff --git a/dubbo-registry/pom.xml b/dubbo-registry/pom.xml
index db3ee35..a652eb8 100644
--- a/dubbo-registry/pom.xml
+++ b/dubbo-registry/pom.xml
@@ -37,5 +37,6 @@
         <module>dubbo-registry-multiple</module>
         <module>dubbo-registry-kubernetes</module>
         <module>dubbo-registry-dns</module>
+        <module>dubbo-registry-xds</module>
     </modules>
 </project>