You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by al...@apache.org on 2022/07/18 09:36:16 UTC

[dubbo-spi-extensions] branch master updated: Enhance remoting module (#131)

This is an automated email from the ASF dual-hosted git repository.

albumenj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo-spi-extensions.git


The following commit(s) were added to refs/heads/master by this push:
     new ce25638  Enhance remoting module (#131)
ce25638 is described below

commit ce25638bf8d4eb9ff1eccda7cc23d9fec35bd397
Author: Albumen Kevin <jh...@gmail.com>
AuthorDate: Mon Jul 18 17:36:11 2022 +0800

    Enhance remoting module (#131)
    
    * add etcd3 remoting
    
    * add grizzly remoting
    
    * add mina remoting
    
    * add p2p remoting
    
    * add redis remoting
    
    * opt pom
    
    * fix compile
    
    * fix compile
---
 dubbo-extensions-dependencies-bom/pom.xml          |  84 ++-
 .../dubbo-remoting-etcd3/pom.xml                   | 107 +++
 .../dubbo/remoting/etcd/AbstractRetryPolicy.java   |  45 ++
 .../apache/dubbo/remoting/etcd/ChildListener.java  |  25 +
 .../org/apache/dubbo/remoting/etcd/Constants.java  |  55 ++
 .../org/apache/dubbo/remoting/etcd/EtcdClient.java | 191 ++++++
 .../dubbo/remoting/etcd/EtcdTransporter.java       |  47 ++
 .../apache/dubbo/remoting/etcd/RetryPolicy.java    |  31 +
 .../apache/dubbo/remoting/etcd/StateListener.java  |  27 +
 .../etcd/jetcd/ConnectionStateListener.java        |  31 +
 .../dubbo/remoting/etcd/jetcd/JEtcdClient.java     | 473 +++++++++++++
 .../remoting/etcd/jetcd/JEtcdClientWrapper.java    | 752 +++++++++++++++++++++
 .../remoting/etcd/jetcd/JEtcdTransporter.java      |  30 +
 .../dubbo/remoting/etcd/jetcd/RetryLoops.java      | 100 +++
 .../dubbo/remoting/etcd/jetcd/RetryNTimes.java     |  36 +
 .../dubbo/remoting/etcd/option/OptionUtil.java     |  73 ++
 .../remoting/etcd/support/AbstractEtcdClient.java  | 194 ++++++
 .../org.apache.dubbo.remoting.etcd.EtcdTransporter |   1 +
 .../dubbo/remoting/etcd/jetcd/JEtcdClientTest.java | 428 ++++++++++++
 .../etcd/jetcd/JEtcdClientWrapperTest.java         | 187 +++++
 .../dubbo/remoting/etcd/jetcd/LeaseTest.java       | 156 +++++
 .../{ => dubbo-remoting-grizzly}/pom.xml           |  32 +-
 .../remoting/transport/grizzly/GrizzlyChannel.java | 198 ++++++
 .../remoting/transport/grizzly/GrizzlyClient.java  | 112 +++
 .../transport/grizzly/GrizzlyCodecAdapter.java     | 143 ++++
 .../remoting/transport/grizzly/GrizzlyHandler.java | 118 ++++
 .../remoting/transport/grizzly/GrizzlyServer.java  | 129 ++++
 .../transport/grizzly/GrizzlyTransporter.java      |  43 ++
 .../internal/org.apache.dubbo.remoting.Transporter |   1 +
 .../transport/grizzly/GrizzlyTransporterTest.java  |  41 ++
 .../dubbo-remoting-mina/pom.xml                    |  51 ++
 .../dubbo/remoting/transport/mina/MinaChannel.java | 191 ++++++
 .../dubbo/remoting/transport/mina/MinaClient.java  | 174 +++++
 .../remoting/transport/mina/MinaCodecAdapter.java  | 167 +++++
 .../dubbo/remoting/transport/mina/MinaHandler.java |  95 +++
 .../dubbo/remoting/transport/mina/MinaServer.java  | 112 +++
 .../remoting/transport/mina/MinaTransporter.java   |  40 ++
 .../internal/org.apache.dubbo.remoting.Transporter |   1 +
 .../transport/mina/ClientToServerTest.java         |  92 +++
 .../remoting/transport/mina/ClientsTest.java       |  65 ++
 .../org/apache/remoting/transport/mina/Hello.java  |  45 ++
 .../transport/mina/MinaClientToServerTest.java     |  41 ++
 .../org/apache/remoting/transport/mina/World.java  |  45 ++
 .../remoting/transport/mina/WorldHandler.java      |  36 +
 .../{ => dubbo-remoting-p2p}/pom.xml               |  33 +-
 .../java/org/apache/dubbo/remoting/p2p/Group.java  |  57 ++
 .../org/apache/dubbo/remoting/p2p/Networker.java   |  39 ++
 .../org/apache/dubbo/remoting/p2p/Networkers.java  |  47 ++
 .../java/org/apache/dubbo/remoting/p2p/Peer.java   |  36 +
 .../dubbo/remoting/p2p/exchange/ExchangeGroup.java |  36 +
 .../remoting/p2p/exchange/ExchangeNetworker.java   |  35 +
 .../remoting/p2p/exchange/ExchangeNetworkers.java  |  45 ++
 .../dubbo/remoting/p2p/exchange/ExchangePeer.java  |  26 +
 .../exchange/support/AbstractExchangeGroup.java    | 128 ++++
 .../p2p/exchange/support/ExchangeServerPeer.java   | 137 ++++
 .../p2p/exchange/support/FileExchangeGroup.java    | 135 ++++
 .../exchange/support/FileExchangeNetworker.java    |  34 +
 .../exchange/support/MulticastExchangeGroup.java   | 108 +++
 .../support/MulticastExchangeNetworker.java        |  34 +
 .../dubbo/remoting/p2p/support/AbstractGroup.java  | 119 ++++
 .../dubbo/remoting/p2p/support/FileGroup.java      | 133 ++++
 .../dubbo/remoting/p2p/support/FileNetworker.java  |  34 +
 .../dubbo/remoting/p2p/support/MulticastGroup.java | 108 +++
 .../remoting/p2p/support/MulticastNetworker.java   |  34 +
 .../dubbo/remoting/p2p/support/ServerPeer.java     | 124 ++++
 .../org.apache.dubbo.remoting.p2p.Networker        |   2 +
 .../support/MulticastExchangeNetworkerTest.java    |  81 +++
 .../remoting/p2p/support/FileNetworkerTest.java    |  83 +++
 .../p2p/support/MulticastNetworkerTest.java        |  71 ++
 .../dubbo-remoting-redis/pom.xml                   |  53 ++
 .../apache/dubbo/remoting/redis/RedisClient.java   |  46 ++
 .../remoting/redis/jedis/ClusterRedisClient.java   | 136 ++++
 .../remoting/redis/jedis/MonoRedisClient.java      | 119 ++++
 .../remoting/redis/jedis/SentinelRedisClient.java  | 122 ++++
 .../redis/support/AbstractRedisClient.java         |  95 +++
 dubbo-remoting-extensions/pom.xml                  |   5 +
 76 files changed, 7311 insertions(+), 29 deletions(-)

diff --git a/dubbo-extensions-dependencies-bom/pom.xml b/dubbo-extensions-dependencies-bom/pom.xml
index ffaf218..977804d 100644
--- a/dubbo-extensions-dependencies-bom/pom.xml
+++ b/dubbo-extensions-dependencies-bom/pom.xml
@@ -116,6 +116,13 @@
         <kryo_serializers_version>0.42</kryo_serializers_version>
         <msgpack_version>0.8.22</msgpack_version>
         <protostuff_version>1.5.9</protostuff_version>
+        <mina_version>1.1.7</mina_version>
+        <slf4j_version>1.7.25</slf4j_version>
+        <grizzly_version>2.4.4</grizzly_version>
+        <jetcd_version>0.5.7</jetcd_version>
+        <grpc.version>1.31.1</grpc.version>
+        <etcd_launcher_version>0.5.7</etcd_launcher_version>
+        <netty4_version>4.1.66.Final</netty4_version>
 
         <maven_flatten_version>1.2.5</maven_flatten_version>
     </properties>
@@ -313,7 +320,82 @@
                 <artifactId>protostuff-runtime</artifactId>
                 <version>${protostuff_version}</version>
             </dependency>
-
+            <dependency>
+                <groupId>org.apache.mina</groupId>
+                <artifactId>mina-core</artifactId>
+                <version>${mina_version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.slf4j</groupId>
+                <artifactId>slf4j-api</artifactId>
+                <version>${slf4j_version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.glassfish.grizzly</groupId>
+                <artifactId>grizzly-core</artifactId>
+                <version>${grizzly_version}</version>
+            </dependency>
+            <dependency>
+                <groupId>io.etcd</groupId>
+                <artifactId>jetcd-core</artifactId>
+                <version>${jetcd_version}</version>
+                <exclusions>
+                    <exclusion>
+                        <groupId>io.netty</groupId>
+                        <artifactId>netty-codec-http2</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>io.netty</groupId>
+                        <artifactId>netty-handler-proxy</artifactId>
+                    </exclusion>
+                </exclusions>
+            </dependency>
+            <dependency>
+                <groupId>io.grpc</groupId>
+                <artifactId>grpc-core</artifactId>
+                <version>${grpc.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>io.grpc</groupId>
+                <artifactId>grpc-netty-shaded</artifactId>
+                <version>${grpc.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>io.grpc</groupId>
+                <artifactId>grpc-netty</artifactId>
+                <version>${grpc.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>io.grpc</groupId>
+                <artifactId>grpc-protobuf</artifactId>
+                <version>${grpc.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>io.grpc</groupId>
+                <artifactId>grpc-stub</artifactId>
+                <version>${grpc.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>io.grpc</groupId>
+                <artifactId>grpc-grpclb</artifactId>
+                <version>${grpc.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>io.etcd</groupId>
+                <artifactId>jetcd-launcher</artifactId>
+                <version>${etcd_launcher_version}</version>
+                <exclusions>
+                    <exclusion>
+                        <groupId>com.github.spotbugs</groupId>
+                        <artifactId>spotbugs-annotations</artifactId>
+                    </exclusion>
+                </exclusions>
+            </dependency>
+            <dependency>
+                <groupId>io.netty</groupId>
+                <artifactId>netty-all</artifactId>
+                <version>${netty4_version}</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>
 
diff --git a/dubbo-remoting-extensions/dubbo-remoting-etcd3/pom.xml b/dubbo-remoting-extensions/dubbo-remoting-etcd3/pom.xml
new file mode 100644
index 0000000..dd7df73
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-etcd3/pom.xml
@@ -0,0 +1,107 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>dubbo-remoting-extensions</artifactId>
+        <groupId>org.apache.dubbo.extensions</groupId>
+        <version>${revision}</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+
+    <modelVersion>4.0.0</modelVersion>
+    <version>1.0.0-SNAPSHOT</version>
+    <artifactId>dubbo-remoting-etcd3</artifactId>
+    <packaging>jar</packaging>
+    <name>${project.artifactId}</name>
+    <description>The etcd3 remoting module of Dubbo project</description>
+
+    <properties>
+        <assertj.version>3.13.2</assertj.version>
+        <skipIntegrationTests>true</skipIntegrationTests>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.dubbo</groupId>
+            <artifactId>dubbo-remoting-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.dubbo</groupId>
+            <artifactId>dubbo-common</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.etcd</groupId>
+            <artifactId>jetcd-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.grpc</groupId>
+            <artifactId>grpc-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.grpc</groupId>
+            <artifactId>grpc-netty</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.grpc</groupId>
+            <artifactId>grpc-protobuf</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.grpc</groupId>
+            <artifactId>grpc-stub</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.grpc</groupId>
+            <artifactId>grpc-grpclb</artifactId>
+        </dependency>
+        <!-- https://mvnrepository.com/artifact/io.etcd/jetcd-launcher -->
+        <dependency>
+            <groupId>io.etcd</groupId>
+            <artifactId>jetcd-launcher</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-all</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+            <version>${assertj.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <configuration>
+                    <skipTests>${skipIntegrationTests}</skipTests>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+
+</project>
diff --git a/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/AbstractRetryPolicy.java b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/AbstractRetryPolicy.java
new file mode 100644
index 0000000..b084ba3
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/AbstractRetryPolicy.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.etcd;
+
+public abstract class AbstractRetryPolicy implements RetryPolicy {
+
+    private final int maxRetried;
+
+    protected AbstractRetryPolicy(int maxRetried) {
+        this.maxRetried = maxRetried;
+    }
+
+    @Override
+    public boolean shouldRetry(int retried, long elapsed, boolean sleep) {
+        if (retried < maxRetried) {
+            try {
+                if (sleep) {
+                    Thread.sleep(getSleepTime(retried, elapsed));
+                }
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                return false;
+            }
+            return true;
+        }
+        return false;
+    }
+
+    protected abstract long getSleepTime(int retried, long elapsed);
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/ChildListener.java b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/ChildListener.java
new file mode 100644
index 0000000..46a0af8
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/ChildListener.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.etcd;
+
+import java.util.List;
+
+public interface ChildListener {
+
+    void childChanged(String path, List<String> children);
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/Constants.java b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/Constants.java
new file mode 100644
index 0000000..3450bb5
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/Constants.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.remoting.etcd;
+
+import static org.apache.dubbo.remoting.Constants.DEFAULT_IO_THREADS;
+
+public interface Constants {
+    String ETCD3_NOTIFY_MAXTHREADS_KEYS = "etcd3.notify.maxthreads";
+
+    int DEFAULT_ETCD3_NOTIFY_THREADS = DEFAULT_IO_THREADS;
+
+    String DEFAULT_ETCD3_NOTIFY_QUEUES_KEY = "etcd3.notify.queues";
+
+    int DEFAULT_GRPC_QUEUES = 300_0000;
+
+    String RETRY_PERIOD_KEY = "retry.period";
+
+    int DEFAULT_RETRY_PERIOD = 5 * 1000;
+
+    int DEFAULT_SESSION_TIMEOUT = 60 * 1000;
+
+    String HTTP_SUBFIX_KEY = "://";
+
+    String HTTP_KEY = "http://";
+
+    int DEFAULT_KEEPALIVE_TIMEOUT = DEFAULT_SESSION_TIMEOUT / 2;
+
+    String SESSION_TIMEOUT_KEY = "session";
+
+    int DEFAULT_RECONNECT_PERIOD = 3 * 1000;
+
+    String ROUTERS_CATEGORY = "routers";
+
+    String PROVIDERS_CATEGORY = "providers";
+
+    String CONSUMERS_CATEGORY = "consumers";
+
+    String CONFIGURATORS_CATEGORY = "configurators";
+}
+
diff --git a/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdClient.java b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdClient.java
new file mode 100644
index 0000000..45b54b0
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdClient.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.etcd;
+
+import org.apache.dubbo.common.URL;
+
+import java.util.List;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public interface EtcdClient {
+
+    /**
+     * save the specified path to the etcd registry.
+     *
+     * @param path the path to be saved
+     */
+    void create(String path);
+
+    /**
+     * save the specified path to the etcd registry.
+     * if node disconnect from etcd, it will be deleted
+     * automatically by etcd when session timeout.
+     *
+     * @param path the path to be saved
+     * @return the lease of current path.
+     */
+    long createEphemeral(String path);
+
+    /**
+     * remove the specified  from etcd registry.
+     *
+     * @param path the path to be removed
+     */
+    void delete(String path);
+
+    /**
+     * find direct children directory, excluding path self,
+     * Never return null.
+     *
+     * @param path the path to be found direct children.
+     * @return direct children directory, contains zero element
+     * list if children directory not exists.
+     */
+    List<String> getChildren(String path);
+
+    /**
+     * register children listener for specified path.
+     *
+     * @param path     the path to be watched when children is added, delete or update.
+     * @param listener when children is changed , listener will be triggered.
+     * @return direct children directory, contains zero element
+     * list if children directory not exists.
+     */
+    List<String> addChildListener(String path, ChildListener listener);
+
+    /**
+     * find watcher of the children listener for specified path.
+     *
+     * @param path     the path to be watched when children is added, delete or update.
+     * @param listener when children is changed , listener will be triggered.
+     * @return watcher if find else null
+     */
+    <T> T getChildListener(String path, ChildListener listener);
+
+    /**
+     * unregister children lister for specified path.
+     *
+     * @param path     the path to be unwatched .
+     * @param listener when children is changed , lister will be triggered.
+     */
+    void removeChildListener(String path, ChildListener listener);
+
+    /**
+     * support connection notify if connection state was changed.
+     *
+     * @param listener if state changed, listener will be triggered.
+     */
+    void addStateListener(StateListener listener);
+
+    /**
+     * remove connection notify if connection state was changed.
+     *
+     * @param listener remove already registered listener, if listener
+     *                 not exists nothing happened.
+     */
+    void removeStateListener(StateListener listener);
+
+    /**
+     * test if current client is active.
+     *
+     * @return true if connection is active else false.
+     */
+    boolean isConnected();
+
+    /**
+     * close current client and release all resourses.
+     */
+    void close();
+
+    URL getUrl();
+
+    /***
+     * create new lease from specified second ,it should be waiting if failed.<p>
+     *
+     * @param second lease time (support second only).
+     * @return lease id from etcd
+     */
+    long createLease(long second);
+
+    /***
+     * create new lease from specified ttl second before waiting specified timeout.<p>
+     *
+     * @param ttl lease time (support second only).
+     * @param timeout the maximum time to wait
+     * @param unit the time unit of the timeout argument
+     * @throws CancellationException if this future was cancelled
+     * @throws ExecutionException if this future completed exceptionally
+     * @throws InterruptedException if the current thread was interrupted
+     * while waiting
+     * @throws TimeoutException if the wait timed out
+     * @return lease id from etcd
+     */
+    long createLease(long ttl, long timeout, TimeUnit unit)
+            throws InterruptedException, ExecutionException, TimeoutException;
+
+    /**
+     * revoke specified lease, any associated path will removed automatically.
+     *
+     * @param lease to be removed lease
+     */
+    void revokeLease(long lease);
+
+
+    /**
+     * Get the value of the specified key.
+     * @param key the specified key
+     * @return null if the value is not found
+     */
+    String getKVValue(String key);
+
+    /**
+     * Put the key value pair to etcd
+     * @param key the specified key
+     * @param value the paired value
+     * @return true if put success
+     */
+    boolean put(String key, String value);
+
+    /**
+     * Put the key value pair to etcd (Ephemeral)
+     * @param key the specified key
+     * @param value the paired value
+     * @return true if put success
+     */
+    boolean putEphemeral(String key, String value);
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdTransporter.java b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdTransporter.java
new file mode 100644
index 0000000..754ba64
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdTransporter.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.etcd;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.extension.Adaptive;
+import org.apache.dubbo.common.extension.SPI;
+import org.apache.dubbo.remoting.Constants;
+
+@SPI("jetcd")
+public interface EtcdTransporter {
+
+    @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
+    EtcdClient connect(URL url);
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/RetryPolicy.java b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/RetryPolicy.java
new file mode 100644
index 0000000..6133ab7
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/RetryPolicy.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.etcd;
+
+public interface RetryPolicy {
+
+    /**
+     * Whether retry is supported when operation fails.
+     *
+     * @param retried the number of times retried so far
+     * @param elapsed the elapsed time in millisecond since the operation was attempted
+     * @param sleep   should be sleep
+     * @return true should be retry
+     */
+    boolean shouldRetry(int retried, long elapsed, boolean sleep);
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/StateListener.java b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/StateListener.java
new file mode 100644
index 0000000..4358083
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/StateListener.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.etcd;
+
+public interface StateListener {
+
+    int DISCONNECTED = 0;
+
+    int CONNECTED = 1;
+
+    void stateChanged(int connected);
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/ConnectionStateListener.java b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/ConnectionStateListener.java
new file mode 100644
index 0000000..729a157
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/ConnectionStateListener.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.remoting.etcd.jetcd;
+
+import io.etcd.jetcd.Client;
+
+public interface ConnectionStateListener {
+
+    /**
+     * Called when there is a state change in the connection
+     *
+     * @param client   the client
+     * @param newState the new state
+     */
+    void stateChanged(Client client, int newState);
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java
new file mode 100644
index 0000000..0da56fa
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java
@@ -0,0 +1,473 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.remoting.etcd.jetcd;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.ExecutorUtil;
+import org.apache.dubbo.common.utils.NamedThreadFactory;
+import org.apache.dubbo.remoting.etcd.ChildListener;
+import org.apache.dubbo.remoting.etcd.StateListener;
+import org.apache.dubbo.remoting.etcd.option.OptionUtil;
+import org.apache.dubbo.remoting.etcd.support.AbstractEtcdClient;
+
+import com.google.protobuf.ByteString;
+import io.etcd.jetcd.ByteSequence;
+import io.etcd.jetcd.api.Event;
+import io.etcd.jetcd.api.KeyValue;
+import io.etcd.jetcd.api.WatchCancelRequest;
+import io.etcd.jetcd.api.WatchCreateRequest;
+import io.etcd.jetcd.api.WatchGrpc;
+import io.etcd.jetcd.api.WatchRequest;
+import io.etcd.jetcd.api.WatchResponse;
+import io.etcd.jetcd.common.exception.ClosedClientException;
+import io.grpc.ManagedChannel;
+import io.grpc.Status;
+import io.grpc.stub.StreamObserver;
+import io.netty.util.internal.ConcurrentSet;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.dubbo.common.constants.CommonConstants.PATH_SEPARATOR;
+import static org.apache.dubbo.remoting.etcd.Constants.DEFAULT_ETCD3_NOTIFY_QUEUES_KEY;
+import static org.apache.dubbo.remoting.etcd.Constants.DEFAULT_ETCD3_NOTIFY_THREADS;
+import static org.apache.dubbo.remoting.etcd.Constants.DEFAULT_GRPC_QUEUES;
+import static org.apache.dubbo.remoting.etcd.Constants.DEFAULT_RETRY_PERIOD;
+import static org.apache.dubbo.remoting.etcd.Constants.DEFAULT_SESSION_TIMEOUT;
+import static org.apache.dubbo.remoting.etcd.Constants.ETCD3_NOTIFY_MAXTHREADS_KEYS;
+import static org.apache.dubbo.remoting.etcd.Constants.RETRY_PERIOD_KEY;
+import static org.apache.dubbo.remoting.etcd.jetcd.JEtcdClientWrapper.UTF_8;
+
+/**
+ * etcd3 client.
+ */
+public class JEtcdClient extends AbstractEtcdClient<JEtcdClient.EtcdWatcher> {
+
+    private JEtcdClientWrapper clientWrapper;
+    private ScheduledExecutorService reconnectSchedule;
+
+    private ExecutorService notifyExecutor;
+
+    private int delayPeriod;
+    private Logger logger = LoggerFactory.getLogger(JEtcdClient.class);
+
+    public JEtcdClient(URL url) {
+        super(url);
+        try {
+            clientWrapper = new JEtcdClientWrapper(url);
+            clientWrapper.setConnectionStateListener((client, state) -> {
+                if (state == StateListener.CONNECTED) {
+                    JEtcdClient.this.stateChanged(StateListener.CONNECTED);
+                } else if (state == StateListener.DISCONNECTED) {
+                    JEtcdClient.this.stateChanged(StateListener.DISCONNECTED);
+                }
+            });
+            delayPeriod = getUrl().getParameter(RETRY_PERIOD_KEY, DEFAULT_RETRY_PERIOD);
+            reconnectSchedule = Executors.newScheduledThreadPool(1,
+                    new NamedThreadFactory("etcd3-watch-auto-reconnect"));
+
+            notifyExecutor = new ThreadPoolExecutor(
+                    1
+                    , url.getParameter(ETCD3_NOTIFY_MAXTHREADS_KEYS, DEFAULT_ETCD3_NOTIFY_THREADS)
+                    , DEFAULT_SESSION_TIMEOUT
+                    , TimeUnit.MILLISECONDS
+                    , new LinkedBlockingQueue<Runnable>(url.getParameter(DEFAULT_ETCD3_NOTIFY_QUEUES_KEY, DEFAULT_GRPC_QUEUES * 3))
+                    , new NamedThreadFactory("etcd3-notify", true));
+
+            clientWrapper.start();
+        } catch (Exception e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public void doCreatePersistent(String path) {
+        clientWrapper.createPersistent(path);
+    }
+
+    @Override
+    public long doCreateEphemeral(String path) {
+        return clientWrapper.createEphemeral(path);
+    }
+
+    @Override
+    public boolean checkExists(String path) {
+        return clientWrapper.checkExists(path);
+    }
+
+    @Override
+    public EtcdWatcher createChildWatcherListener(String path, ChildListener listener) {
+        return new EtcdWatcher(listener);
+    }
+
+    @Override
+    public List<String> addChildWatcherListener(String path, EtcdWatcher etcdWatcher) {
+        return etcdWatcher.forPath(path);
+    }
+
+    @Override
+    public void removeChildWatcherListener(String path, EtcdWatcher etcdWatcher) {
+        etcdWatcher.unwatch();
+    }
+
+    @Override
+    public List<String> getChildren(String path) {
+        return clientWrapper.getChildren(path);
+    }
+
+    @Override
+    public boolean isConnected() {
+        return clientWrapper.isConnected();
+    }
+
+    @Override
+    public long createLease(long second) {
+        return clientWrapper.createLease(second);
+    }
+
+    @Override
+    public long createLease(long ttl, long timeout, TimeUnit unit)
+            throws InterruptedException, ExecutionException, TimeoutException {
+        return clientWrapper.createLease(ttl, timeout, unit);
+    }
+
+    @Override
+    public void delete(String path) {
+        clientWrapper.delete(path);
+    }
+
+    @Override
+    public void revokeLease(long lease) {
+        clientWrapper.revokeLease(lease);
+    }
+
+    @Override
+    public void doClose() {
+        try {
+            if (notifyExecutor != null) {
+                ExecutorUtil.shutdownNow(notifyExecutor, 100);
+            }
+        } catch (Exception e) {
+            logger.warn(e.getMessage(), e);
+        }
+
+        try {
+            if (reconnectSchedule != null) {
+                ExecutorUtil.shutdownNow(reconnectSchedule, 100);
+            }
+        } catch (Exception e) {
+            logger.warn(e.getMessage(), e);
+        } finally {
+            clientWrapper.doClose();
+        }
+    }
+
+    @Override
+    public String getKVValue(String key) {
+        return clientWrapper.getKVValue(key);
+    }
+
+    @Override
+    public boolean put(String key, String value) {
+        return clientWrapper.put(key, value);
+    }
+
+    @Override
+    public boolean putEphemeral(String key, String value) {
+        return clientWrapper.putEphemeral(key, value);
+    }
+
+    public ManagedChannel getChannel() {
+        return clientWrapper.getChannel();
+    }
+
+    public class EtcdWatcher implements StreamObserver<WatchResponse> {
+
+        protected WatchGrpc.WatchStub watchStub;
+        protected StreamObserver<WatchRequest> watchRequest;
+        protected long watchId;
+        protected String path;
+        protected Throwable throwable;
+        protected volatile Set<String> urls = new ConcurrentSet<>();
+        private ChildListener listener;
+
+        protected ReentrantLock lock = new ReentrantLock(true);
+
+        public EtcdWatcher(ChildListener listener) {
+            this.listener = listener;
+        }
+
+        @Override
+        public void onNext(WatchResponse response) {
+
+            // prevents grpc on sending watchResponse to a closed watch client.
+            if (!isConnected()) {
+                return;
+            }
+
+            watchId = response.getWatchId();
+
+            if (listener != null) {
+                int modified = 0;
+                String service = null;
+                Iterator<Event> iterator = response.getEventsList().iterator();
+                while (iterator.hasNext()) {
+                    Event event = iterator.next();
+                    switch (event.getType()) {
+                        case PUT: {
+                            if (((service = find(event)) != null)
+                                    && safeUpdate(service, true)) {
+                                modified++;
+                            }
+                            break;
+                        }
+                        case DELETE: {
+                            if (((service = find(event)) != null)
+                                    && safeUpdate(service, false)) {
+                                modified++;
+                            }
+                            break;
+                        }
+                        default:
+                            break;
+                    }
+                }
+                if (modified > 0) {
+                    notifyExecutor.execute(() -> listener.childChanged(path, new ArrayList<>(urls)));
+                }
+
+            }
+        }
+
+        @Override
+        public void onError(Throwable e) {
+            tryReconnect(e);
+        }
+
+        public void unwatch() {
+
+            // prevents grpc on sending watchResponse to a closed watch client.
+            if (!isConnected()) {
+                return;
+            }
+
+            try {
+                /**
+                 * issue : https://github.com/apache/dubbo/issues/4115
+                 *
+                 * When the network is reconnected, the listener is empty
+                 * and the data cannot be received.
+                 */
+                // this.listener = null;
+
+                if (watchRequest != null) {
+                    WatchCancelRequest watchCancelRequest =
+                            WatchCancelRequest.newBuilder().setWatchId(watchId).build();
+                    WatchRequest cancelRequest = WatchRequest.newBuilder()
+                            .setCancelRequest(watchCancelRequest).build();
+                    this.watchRequest.onNext(cancelRequest);
+                }
+            } catch (Exception ignored) {
+                logger.warn("Failed to cancel watch for path '" + path + "'", ignored);
+            }
+        }
+
+        public List<String> forPath(String path) {
+
+            if (!isConnected()) {
+                throw new ClosedClientException("watch client has been closed, path '" + path + "'");
+            }
+            if (this.path != null) {
+                unwatch();
+            }
+
+            this.path = path;
+
+            lock.lock();
+            try {
+
+                this.watchStub = WatchGrpc.newStub(clientWrapper.getChannel());
+                this.watchRequest = watchStub.watch(this);
+                this.watchRequest.onNext(nextRequest());
+
+                List<String> children = clientWrapper.getChildren(path);
+                /**
+                 * caching the current service
+                 */
+                if (!children.isEmpty()) {
+                    this.urls.addAll(filterChildren(children));
+                }
+
+                return new ArrayList<>(urls);
+            } finally {
+                lock.unlock();
+            }
+        }
+
+        private boolean safeUpdate(String service, boolean add) {
+            lock.lock();
+            try {
+                /**
+                 * If the collection already contains the specified service, do nothing
+                 */
+                return add ? this.urls.add(service) : this.urls.remove(service);
+            } finally {
+                lock.unlock();
+            }
+        }
+
+        private String find(Event event) {
+            KeyValue keyValue = event.getKv();
+            String key = keyValue.getKey().toStringUtf8();
+
+            int len = path.length(), index = len, count = 0;
+            if (key.length() >= index) {
+                for (; (index = key.indexOf(PATH_SEPARATOR, index)) != -1; ++index) {
+                    if (count++ > 1) {
+                        break;
+                    }
+                }
+            }
+
+            /**
+             * if children changed , we should refresh invokers
+             */
+            if (count == 1) {
+                /**
+                 * remove prefix
+                 */
+                return key.substring(len + 1);
+            }
+
+            return null;
+        }
+
+        private List<String> filterChildren(List<String> children) {
+            if (children == null) {
+                return Collections.emptyList();
+            }
+            if (children.size() <= 0) {
+                return children;
+            }
+            final int len = path.length();
+            return children.stream().parallel()
+                    .filter(child -> {
+                        int index = len, count = 0;
+                        if (child.length() > len) {
+                            for (; (index = child.indexOf(PATH_SEPARATOR, index)) != -1; ++index) {
+                                if (count++ > 1) {
+                                    break;
+                                }
+                            }
+                        }
+                        return count == 1;
+                    })
+                    .map(child -> child.substring(len + 1))
+                    .collect(toList());
+        }
+
+        /**
+         * create new watching request for current path.
+         */
+        protected WatchRequest nextRequest() {
+
+            WatchCreateRequest.Builder builder = WatchCreateRequest.newBuilder()
+                    .setKey(ByteString.copyFromUtf8(path))
+                    .setRangeEnd(ByteString.copyFrom(
+                            OptionUtil.prefixEndOf(ByteSequence.from(path, UTF_8)).getBytes()))
+                    .setProgressNotify(true);
+
+            return WatchRequest.newBuilder().setCreateRequest(builder).build();
+        }
+
+        public void tryReconnect(Throwable e) {
+
+            this.throwable = e;
+
+            logger.error("watcher client has error occurred, current path '" + path + "'", e);
+
+            // prevents grpc on sending error to a closed watch client.
+            if (!isConnected()) {
+                return;
+            }
+
+
+            Status status = Status.fromThrowable(e);
+            // system may be recover later, current connect won't be lost
+            if (OptionUtil.isHaltError(status) || OptionUtil.isNoLeaderError(status)) {
+                reconnectSchedule.schedule(this::reconnect, new Random().nextInt(delayPeriod), TimeUnit.MILLISECONDS);
+                return;
+            }
+            // reconnect with a delay; avoiding immediate retry on a long connection downtime.
+            reconnectSchedule.schedule(this::reconnect, new Random().nextInt(delayPeriod), TimeUnit.MILLISECONDS);
+        }
+
+        protected synchronized void reconnect() {
+            this.closeWatchRequest();
+            this.recreateWatchRequest();
+        }
+
+        protected void recreateWatchRequest() {
+            if (watchRequest == null) {
+                this.watchStub = WatchGrpc.newStub(clientWrapper.getChannel());
+                this.watchRequest = watchStub.watch(this);
+            }
+            this.watchRequest.onNext(nextRequest());
+            this.throwable = null;
+            logger.warn("watch client retried connect for path '" + path + "', connection status : " + isConnected());
+        }
+
+        protected void closeWatchRequest() {
+            if (this.watchRequest == null) {
+                return;
+            }
+
+            try {
+                WatchCancelRequest watchCancelRequest =
+                        WatchCancelRequest.newBuilder().setWatchId(watchId).build();
+                WatchRequest cancelRequest = WatchRequest.newBuilder()
+                        .setCancelRequest(watchCancelRequest).build();
+                watchRequest.onNext(cancelRequest);
+            } finally {
+                this.watchRequest.onCompleted();
+                this.watchRequest = null;
+            }
+        }
+
+        @Override
+        public void onCompleted() {
+            // do not touch this method, if you want terminate this stream.
+        }
+    }
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java
new file mode 100644
index 0000000..286586f
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java
@@ -0,0 +1,752 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.remoting.etcd.jetcd;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.ConcurrentHashSet;
+import org.apache.dubbo.common.utils.NamedThreadFactory;
+import org.apache.dubbo.common.utils.ReflectUtils;
+import org.apache.dubbo.common.utils.StringUtils;
+import org.apache.dubbo.remoting.etcd.RetryPolicy;
+import org.apache.dubbo.remoting.etcd.StateListener;
+
+import io.etcd.jetcd.ByteSequence;
+import io.etcd.jetcd.Client;
+import io.etcd.jetcd.ClientBuilder;
+import io.etcd.jetcd.KeyValue;
+import io.etcd.jetcd.common.exception.ErrorCode;
+import io.etcd.jetcd.common.exception.EtcdException;
+import io.etcd.jetcd.kv.GetResponse;
+import io.etcd.jetcd.kv.PutResponse;
+import io.etcd.jetcd.lease.LeaseKeepAliveResponse;
+import io.etcd.jetcd.options.GetOption;
+import io.etcd.jetcd.options.PutOption;
+import io.etcd.jetcd.support.CloseableClient;
+import io.etcd.jetcd.support.Observers;
+import io.grpc.ConnectivityState;
+import io.grpc.ManagedChannel;
+import io.grpc.Status;
+import io.grpc.stub.StreamObserver;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.dubbo.common.constants.CommonConstants.COMMA_SEPARATOR;
+import static org.apache.dubbo.common.constants.CommonConstants.PATH_SEPARATOR;
+import static org.apache.dubbo.remoting.etcd.Constants.DEFAULT_KEEPALIVE_TIMEOUT;
+import static org.apache.dubbo.remoting.etcd.Constants.DEFAULT_RECONNECT_PERIOD;
+import static org.apache.dubbo.remoting.etcd.Constants.DEFAULT_RETRY_PERIOD;
+import static org.apache.dubbo.remoting.etcd.Constants.HTTP_KEY;
+import static org.apache.dubbo.remoting.etcd.Constants.HTTP_SUBFIX_KEY;
+import static org.apache.dubbo.remoting.etcd.Constants.RETRY_PERIOD_KEY;
+import static org.apache.dubbo.remoting.etcd.Constants.SESSION_TIMEOUT_KEY;
+
+public class JEtcdClientWrapper {
+
+    private Logger logger = LoggerFactory.getLogger(JEtcdClientWrapper.class);
+
+    private final URL url;
+    private volatile Client client;
+    private volatile boolean started = false;
+    private volatile boolean connectState = false;
+    private ScheduledFuture future;
+    private ScheduledExecutorService reconnectNotify;
+    private AtomicReference<ManagedChannel> channel;
+
+    private ConnectionStateListener connectionStateListener;
+
+    private long expirePeriod;
+
+    private CompletableFuture<Client> completableFuture;
+
+    private RetryPolicy retryPolicy;
+
+    private RuntimeException failed;
+
+    private final ScheduledFuture<?> retryFuture;
+    private final ScheduledExecutorService retryExecutor = Executors.newScheduledThreadPool(1,
+            new NamedThreadFactory("Etcd3RegistryKeepAliveFailedRetryTimer", true));
+
+    private final Set<String> failedRegistered = new ConcurrentHashSet<String>();
+
+    private final Set<String> registeredPaths = new ConcurrentHashSet<>();
+    private volatile CloseableClient keepAlive = null;
+
+    /**
+     * Support temporary nodes to reuse the same lease
+     */
+    private volatile long globalLeaseId;
+
+    private volatile boolean cancelKeepAlive = false;
+
+    public static final Charset UTF_8 = StandardCharsets.UTF_8;
+
+    public JEtcdClientWrapper(URL url) {
+        this.url = url;
+        this.expirePeriod = url.getParameter(SESSION_TIMEOUT_KEY, DEFAULT_KEEPALIVE_TIMEOUT) / 1000;
+        if (expirePeriod <= 0) {
+            this.expirePeriod = DEFAULT_KEEPALIVE_TIMEOUT / 1000;
+        }
+        this.channel = new AtomicReference<>();
+        this.completableFuture = CompletableFuture.supplyAsync(() -> prepareClient(url));
+        this.reconnectNotify = Executors.newScheduledThreadPool(1,
+                new NamedThreadFactory("reconnectNotify", true));
+        this.retryPolicy = new RetryNTimes(1, 1000, TimeUnit.MILLISECONDS);
+
+        this.failed = new IllegalStateException("Etcd3 registry is not connected yet, url:" + url);
+        int retryPeriod = url.getParameter(RETRY_PERIOD_KEY, DEFAULT_RETRY_PERIOD);
+
+        this.retryFuture = retryExecutor.scheduleWithFixedDelay(() -> {
+            try {
+                retry();
+            } catch (Throwable t) {
+                logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
+            }
+        }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
+    }
+
+    private Client prepareClient(URL url) {
+
+        int maxInboundSize = DEFAULT_INBOUND_SIZE;
+        if (StringUtils.isNotEmpty(System.getProperty(GRPC_MAX_INBOUND_SIZE_KEY))) {
+            maxInboundSize = Integer.valueOf(System.getProperty(GRPC_MAX_INBOUND_SIZE_KEY));
+        }
+
+        // TODO, uses default pick-first round robin.
+        ClientBuilder clientBuilder = Client.builder()
+                .endpoints(endPoints(url.getBackupAddress()))
+                .maxInboundMessageSize(maxInboundSize);
+
+        return clientBuilder.build();
+    }
+
+    public Client getClient() {
+        return client;
+    }
+
+    /**
+     * try to get current connected channel.
+     *
+     * @return connected channel.
+     */
+    public ManagedChannel getChannel() {
+        if (channel.get() == null || (channel.get().isShutdown() || channel.get().isTerminated())) {
+            channel.set(newChannel(client));
+        }
+        return channel.get();
+    }
+
+    /**
+     * find direct children directory, excluding path self,
+     * Never return null.
+     *
+     * @param path the path to be found direct children.
+     * @return direct children directory, contains zero element
+     * list if children directory not exists.
+     */
+    public List<String> getChildren(String path) {
+        try {
+            return RetryLoops.invokeWithRetry(
+                    () -> {
+                        requiredNotNull(client, failed);
+                        int len = path.length();
+                        return client.getKVClient()
+                                .get(ByteSequence.from(path, UTF_8),
+                                        GetOption.newBuilder().withPrefix(ByteSequence.from(path, UTF_8)).build())
+                                .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS)
+                                .getKvs().stream().parallel()
+                                .filter(pair -> {
+                                    String key = pair.getKey().toString(UTF_8);
+                                    int index = len, count = 0;
+                                    if (key.length() > len) {
+                                        for (; (index = key.indexOf(PATH_SEPARATOR, index)) != -1; ++index) {
+                                            if (count++ > 1) {
+                                                break;
+                                            }
+                                        }
+                                    }
+                                    return count == 1;
+                                })
+                                .map(pair -> pair.getKey().toString(UTF_8))
+                                .collect(toList());
+                    }, retryPolicy);
+        } catch (Exception e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+
+    public boolean isConnected() {
+        return ConnectivityState.READY == (getChannel().getState(false))
+                || ConnectivityState.IDLE == (getChannel().getState(false));
+    }
+
+    public long createLease(long second) {
+        try {
+            return RetryLoops.invokeWithRetry(
+                    () -> {
+                        requiredNotNull(client, failed);
+                        return client.getLeaseClient()
+                                .grant(second)
+                                .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS)
+                                .getID();
+                    }, retryPolicy);
+        } catch (Exception e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+
+    public void revokeLease(long lease) {
+        try {
+            RetryLoops.invokeWithRetry(
+                    (Callable<Void>) () -> {
+                        requiredNotNull(client, failed);
+                        client.getLeaseClient()
+                                .revoke(lease)
+                                .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS);
+                        return null;
+                    }, retryPolicy);
+        } catch (Exception e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+
+    public long createLease(long ttl, long timeout, TimeUnit unit)
+            throws InterruptedException, ExecutionException, TimeoutException {
+
+        if (timeout <= 0) {
+            return createLease(ttl);
+        }
+
+        requiredNotNull(client, failed);
+        return client.getLeaseClient()
+                .grant(ttl)
+                .get(timeout, unit).getID();
+    }
+
+
+    /**
+     * try to check if path exists.
+     */
+    public boolean checkExists(String path) {
+        try {
+            return RetryLoops.invokeWithRetry(
+                    () -> {
+                        requiredNotNull(client, failed);
+                        return client.getKVClient()
+                                .get(ByteSequence.from(path, UTF_8), GetOption.newBuilder().withCountOnly(true).build())
+                                .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS)
+                                .getCount() > 0;
+                    }, retryPolicy);
+        } catch (Exception e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * only internal use only, maybe change in the future
+     */
+    protected Long find(String path) {
+        try {
+            return RetryLoops.invokeWithRetry(
+                    () -> {
+                        requiredNotNull(client, failed);
+                        return client.getKVClient()
+                                .get(ByteSequence.from(path, UTF_8))
+                                .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS)
+                                .getKvs().stream()
+                                .mapToLong(keyValue -> Long.valueOf(keyValue.getValue().toString(UTF_8)))
+                                .findFirst().getAsLong();
+                    }, retryPolicy);
+        } catch (Exception e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+
+    public void createPersistent(String path) {
+        try {
+            RetryLoops.invokeWithRetry(
+                    (Callable<Void>) () -> {
+                        requiredNotNull(client, failed);
+                        client.getKVClient()
+                                .put(ByteSequence.from(path, UTF_8),
+                                        ByteSequence.from(String.valueOf(path.hashCode()), UTF_8))
+                                .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS);
+                        return null;
+                    }, retryPolicy);
+        } catch (Exception e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * create new ephemeral path save to etcd .
+     * if node disconnect from etcd, it will be deleted
+     * automatically by etcd when session timeout.
+     *
+     * @param path the path to be saved
+     * @return the lease of current path.
+     */
+    public long createEphemeral(String path) {
+        try {
+            return RetryLoops.invokeWithRetry(
+                    () -> {
+                        requiredNotNull(client, failed);
+
+                        registeredPaths.add(path);
+                        keepAlive();
+                        final long leaseId = globalLeaseId;
+                        client.getKVClient()
+                                .put(ByteSequence.from(path, UTF_8)
+                                        , ByteSequence.from(String.valueOf(leaseId), UTF_8)
+                                        , PutOption.newBuilder().withLeaseId(leaseId).build())
+                                .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS);
+                        return leaseId;
+                    }, retryPolicy);
+        } catch (Exception e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+
+    // easy for mock
+    public void keepAlive(long lease) {
+        this.keepAlive(lease, null);
+    }
+
+    @SuppressWarnings("unchecked")
+    private <T> void keepAlive(long lease, Consumer<T> onFailed) {
+        final StreamObserver<LeaseKeepAliveResponse> observer = new Observers.Builder()
+                .onError((e) -> {
+                    if (e instanceof EtcdException) {
+                        EtcdException error = (EtcdException) e;
+                        /**
+                         * ttl has expired
+                         */
+                        if (error.getErrorCode() == ErrorCode.NOT_FOUND) {
+                            keepAlive0(onFailed);
+                        }
+                    }
+                }).onCompleted(() -> {
+                    /**
+                     * deadline reached.
+                     */
+                    keepAlive0(onFailed);
+                }).build();
+
+        /**
+         * If there is already a keepalive, cancel first
+         */
+        cancelKeepAlive();
+
+        /**
+         * create and set new keepAlive to globalKeepAliveRef
+         */
+        this.keepAlive = client.getLeaseClient().keepAlive(lease, observer);
+    }
+
+    private void keepAlive() throws Exception {
+        if (keepAlive == null) {
+            synchronized (this) {
+                if (keepAlive == null) {
+                    this.globalLeaseId = client.getLeaseClient()
+                            .grant(expirePeriod)
+                            .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS)
+                            .getID();
+                    /**
+                     * If the keepAlive expires, the registration will be re-attempted
+                     */
+                    keepAlive(globalLeaseId, (NULL) -> recovery());
+                }
+            }
+        }
+    }
+
+    private <T> void keepAlive0(Consumer<T> onFailed) {
+        if (onFailed != null) {
+
+            /**
+             * The following two scenarios will cause the keep-alive failure:
+             *
+             * 1. Service is offline
+             * 2. Local deadline check expired
+             *
+             * The multiplex lease cannot update the local deadline,
+             * causing the extreme scene service to be dropped.
+             *
+             */
+            long leaseId = globalLeaseId;
+            try {
+                if (logger.isWarnEnabled()) {
+                    logger.warn("Failed to keep alive for global lease '" + leaseId + "', waiting for retry again.");
+                }
+                onFailed.accept(null);
+            } catch (Exception ignored) {
+                logger.warn("Failed to recover from global lease expired or lease deadline exceeded. lease '" + leaseId + "'", ignored);
+            }
+        }
+    }
+
+    private void recovery() {
+
+        try {
+            /**
+             * The client is processing reconnection
+             */
+            if (cancelKeepAlive) {
+                return;
+            }
+
+            cancelKeepAlive();
+
+            Set<String> ephemeralPaths = new HashSet<String>(registeredPaths);
+            if (!ephemeralPaths.isEmpty()) {
+                for (String path : ephemeralPaths) {
+                    try {
+
+                        /**
+                         * The client is processing reconnection,
+                         * cancel remaining service registration
+                         */
+                        if (cancelKeepAlive) {
+                            return;
+                        }
+
+                        createEphemeral(path);
+                        failedRegistered.remove(path);
+                    } catch (Exception e) {
+
+                        /**
+                         * waiting for retry again
+                         */
+                        failedRegistered.add(path);
+
+                        Status status = Status.fromThrowable(e);
+                        if (status.getCode() == Status.Code.NOT_FOUND) {
+                            cancelKeepAlive();
+                        }
+                    }
+                }
+            }
+        } catch (Throwable t) {
+            logger.warn("Unexpected error, failed to recover from global lease expired or deadline exceeded.", t);
+        }
+    }
+
+    public void delete(String path) {
+        try {
+            RetryLoops.invokeWithRetry(
+                    (Callable<Void>) () -> {
+                        requiredNotNull(client, failed);
+                        client.getKVClient()
+                                .delete(ByteSequence.from(path, UTF_8))
+                                .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS);
+                        registeredPaths.remove(path);
+                        return null;
+                    }, retryPolicy);
+        } catch (Exception e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        } finally {
+            /**
+             * Cancel retry
+             */
+            failedRegistered.remove(path);
+        }
+    }
+
+    public String[] endPoints(String backupAddress) {
+        String[] endpoints = backupAddress.split(COMMA_SEPARATOR);
+        List<String> addresses = Arrays.stream(endpoints)
+                .map(address -> address.contains(HTTP_SUBFIX_KEY)
+                        ? address
+                        : HTTP_KEY + address)
+                .collect(toList());
+        Collections.shuffle(addresses);
+        return addresses.toArray(new String[0]);
+    }
+
+    /**
+     * because jetcd's connection change callback not supported yet, we must
+     * loop to test if connect or disconnect event happened or not. It will be changed
+     * in the future if we found better choice.
+     */
+    public void start() {
+        if (!started) {
+            try {
+                this.client = completableFuture.get(expirePeriod, TimeUnit.SECONDS);
+                this.connectState = isConnected();
+                this.started = true;
+            } catch (Throwable t) {
+                logger.error("Timeout! etcd3 server can not be connected in : " + expirePeriod + " seconds! url: " + url, t);
+
+                completableFuture.whenComplete((c, e) -> {
+                    this.client = c;
+                    if (e != null) {
+                        logger.error("Got an exception when trying to create etcd3 instance, can not connect to etcd3 server, url: " + url, e);
+                    }
+                });
+
+            }
+
+            try {
+                this.future = reconnectNotify.scheduleWithFixedDelay(() -> {
+                    boolean connected = isConnected();
+                    if (connectState != connected) {
+                        int notifyState = connected ? StateListener.CONNECTED : StateListener.DISCONNECTED;
+                        if (connectionStateListener != null) {
+                            try {
+                                if (connected) {
+                                    clearKeepAlive();
+                                }
+                                connectionStateListener.stateChanged(getClient(), notifyState);
+                            } finally {
+                                cancelKeepAlive = false;
+                            }
+                        }
+                        connectState = connected;
+                    }
+                }, DEFAULT_RECONNECT_PERIOD, DEFAULT_RECONNECT_PERIOD, TimeUnit.MILLISECONDS);
+            } catch (Throwable t) {
+                logger.error("monitor reconnect status failed.", t);
+            }
+        }
+    }
+
+    private void cancelKeepAlive() {
+        try {
+            if (keepAlive != null) {
+                keepAlive.close();
+            }
+        } finally {
+            // help for gc
+            keepAlive = null;
+        }
+    }
+
+    private void clearKeepAlive() {
+        cancelKeepAlive = true;
+        failedRegistered.clear();
+        cancelKeepAlive();
+    }
+
+    protected void doClose() {
+
+        try {
+            cancelKeepAlive = true;
+            if (globalLeaseId != 0) {
+                revokeLease(this.globalLeaseId);
+            }
+        } catch (Exception e) {
+            logger.warn("revoke global lease '" + globalLeaseId + "' failed, registry: " + url, e);
+        }
+
+        try {
+            if (started && future != null) {
+                started = false;
+                future.cancel(true);
+                reconnectNotify.shutdownNow();
+            }
+        } catch (Exception e) {
+            logger.warn("stop reconnect Notify failed, registry: " + url, e);
+        }
+
+        try {
+            retryFuture.cancel(true);
+            retryExecutor.shutdownNow();
+        } catch (Throwable t) {
+            logger.warn(t.getMessage(), t);
+        }
+
+        if (getClient() != null) {
+            getClient().close();
+        }
+    }
+
+    /**
+     * try get client's shared channel, because all fields is private on jetcd,
+     * we must using it by reflect, in the future, jetcd may provider better tools.
+     *
+     * @param client get channel from current client
+     * @return current connection channel
+     */
+    private ManagedChannel newChannel(Client client) {
+        try {
+            Field connectionField = client.getClass().getDeclaredField("connectionManager");
+            connectionField.setAccessible(true);
+            Object connection = connectionField.get(client);
+            Method channel = connection.getClass().getDeclaredMethod("getChannel");
+            ReflectUtils.makeAccessible(channel);
+            return (ManagedChannel) channel.invoke(connection);
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to obtain connection channel from " + url.getBackupAddress(), e);
+        }
+    }
+
+    public ConnectionStateListener getConnectionStateListener() {
+        return connectionStateListener;
+    }
+
+    public void setConnectionStateListener(ConnectionStateListener connectionStateListener) {
+        this.connectionStateListener = connectionStateListener;
+    }
+
+    public static void requiredNotNull(Object obj, RuntimeException exception) {
+        if (obj == null) {
+            throw exception;
+        }
+    }
+
+    public String getKVValue(String key) {
+        if (null == key) {
+            return null;
+        }
+
+        CompletableFuture<GetResponse> responseFuture = this.client.getKVClient().get(ByteSequence.from(key, UTF_8));
+
+        try {
+            List<KeyValue> result = responseFuture.get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS).getKvs();
+            if (!result.isEmpty()) {
+                return result.get(0).getValue().toString(UTF_8);
+            }
+        } catch (Exception e) {
+            // ignore
+        }
+
+        return null;
+    }
+
+
+    public boolean put(String key, String value) {
+        if (key == null || value == null) {
+            return false;
+        }
+        CompletableFuture<PutResponse> putFuture =
+                this.client.getKVClient().put(ByteSequence.from(key, UTF_8), ByteSequence.from(value, UTF_8));
+        try {
+            putFuture.get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS);
+            return true;
+        } catch (Exception e) {
+            // ignore
+        }
+        return false;
+    }
+
+    public boolean putEphemeral(final String key, String value) {
+        try {
+            return RetryLoops.invokeWithRetry(
+                    () -> {
+                        requiredNotNull(client, failed);
+                        // recovery an retry
+                        keepAlive();
+                        final long leaseId = globalLeaseId;
+                        client.getKVClient()
+                                .put(ByteSequence.from(key, UTF_8)
+                                        , ByteSequence.from(String.valueOf(value), UTF_8)
+                                        , PutOption.newBuilder().withLeaseId(leaseId).build())
+                                .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS);
+                        return true;
+                    }, retryPolicy);
+        } catch (Exception e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+
+    private void retry() {
+        if (!failedRegistered.isEmpty()) {
+            Set<String> failed = new HashSet<String>(failedRegistered);
+            if (!failed.isEmpty()) {
+
+                if (cancelKeepAlive) {
+                    return;
+                }
+
+                if (logger.isWarnEnabled()) {
+                    logger.warn("Retry failed register(keep alive) for path '" + failed
+                            + "', path size: " + failed.size());
+                }
+                try {
+                    for (String path : failed) {
+                        try {
+
+                            /**
+                             * Is it currently reconnecting ?
+                             */
+                            if (cancelKeepAlive) {
+                                return;
+                            }
+
+                            createEphemeral(path);
+                            failedRegistered.remove(path);
+                        } catch (Throwable e) {
+
+                            failedRegistered.add(path);
+
+                            Status status = Status.fromThrowable(e);
+                            if (status.getCode() == Status.Code.NOT_FOUND) {
+                                cancelKeepAlive();
+                            }
+
+                            logger.warn("Failed to retry register(keep alive) for path '" + path + "', waiting for again, cause: " + e.getMessage(), e);
+                        }
+                    }
+                } catch (Throwable t) {
+                    logger.warn("Failed to retry register(keep alive) for path '" + failed + "', waiting for again, cause: " + t.getMessage(), t);
+                }
+            }
+        }
+    }
+
+    /**
+     * default request timeout
+     */
+    public static final long DEFAULT_REQUEST_TIMEOUT = obtainRequestTimeout();
+
+    public static final int DEFAULT_INBOUND_SIZE = 100 * 1024 * 1024;
+
+    public static final String GRPC_MAX_INBOUND_SIZE_KEY = "grpc.max.inbound.size";
+
+    public static final String ETCD_REQUEST_TIMEOUT_KEY = "etcd.request.timeout";
+
+    private static int obtainRequestTimeout() {
+        if (StringUtils.isNotEmpty(System.getProperty(ETCD_REQUEST_TIMEOUT_KEY))) {
+            return Integer.valueOf(System.getProperty(ETCD_REQUEST_TIMEOUT_KEY));
+        }
+        /**
+         * 10 seconds.
+         */
+        return 10 * 1000;
+    }
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdTransporter.java b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdTransporter.java
new file mode 100644
index 0000000..5ddec8e
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdTransporter.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.etcd.jetcd;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.remoting.etcd.EtcdClient;
+import org.apache.dubbo.remoting.etcd.EtcdTransporter;
+
+public class JEtcdTransporter implements EtcdTransporter {
+
+    @Override
+    public EtcdClient connect(URL url) {
+        return new JEtcdClient(url);
+    }
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/RetryLoops.java b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/RetryLoops.java
new file mode 100644
index 0000000..409d243
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/RetryLoops.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.etcd.jetcd;
+
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.remoting.etcd.RetryPolicy;
+import org.apache.dubbo.remoting.etcd.option.OptionUtil;
+
+import io.grpc.Status;
+
+import java.util.concurrent.Callable;
+
+public class RetryLoops {
+
+    private final long startTimeMs = System.currentTimeMillis();
+    private boolean isDone = false;
+    private int retriedCount = 0;
+    private Logger logger = LoggerFactory.getLogger(RetryLoops.class);
+
+    public static <R> R invokeWithRetry(Callable<R> task, RetryPolicy retryPolicy) throws Exception {
+        R result = null;
+        RetryLoops retryLoop = new RetryLoops();
+        while (retryLoop.shouldContinue()) {
+            try {
+                result = task.call();
+                retryLoop.complete();
+            } catch (Exception e) {
+                retryLoop.fireException(e, retryPolicy);
+            }
+        }
+        return result;
+    }
+
+    public void fireException(Exception e, RetryPolicy retryPolicy) throws Exception {
+
+        if (e instanceof InterruptedException) {
+            Thread.currentThread().interrupt();
+        }
+
+        boolean rethrow = true;
+        if (isRetryException(e)
+                && retryPolicy.shouldRetry(retriedCount++, System.currentTimeMillis() - startTimeMs, true)) {
+            rethrow = false;
+        }
+
+        if (rethrow) {
+            throw e;
+        }
+    }
+
+    private boolean isRetryException(Throwable e) {
+        Status status = Status.fromThrowable(e);
+        if (OptionUtil.isRecoverable(status)) {
+            return true;
+        }
+
+        return false;
+    }
+
+    public boolean shouldContinue() {
+        return !isDone;
+    }
+
+    public void complete() {
+        isDone = true;
+    }
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/RetryNTimes.java b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/RetryNTimes.java
new file mode 100644
index 0000000..7453208
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/RetryNTimes.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.etcd.jetcd;
+
+import org.apache.dubbo.remoting.etcd.AbstractRetryPolicy;
+
+import java.util.concurrent.TimeUnit;
+
+public class RetryNTimes extends AbstractRetryPolicy {
+
+    private final long sleepMilliseconds;
+
+    public RetryNTimes(int maxRetried, int sleepTime, TimeUnit unit) {
+        super(maxRetried);
+        this.sleepMilliseconds = unit.convert(sleepTime, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    protected long getSleepTime(int retried, long elapsed) {
+        return sleepMilliseconds;
+    }
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/option/OptionUtil.java b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/option/OptionUtil.java
new file mode 100644
index 0000000..fa5955c
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/option/OptionUtil.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.etcd.option;
+
+import io.etcd.jetcd.ByteSequence;
+import io.grpc.Status;
+import io.netty.handler.codec.http2.Http2Exception;
+
+public class OptionUtil {
+
+    public static final byte[] NO_PREFIX_END = {0};
+
+    public static final ByteSequence prefixEndOf(ByteSequence prefix) {
+        byte[] endKey = prefix.getBytes().clone();
+        if (prefix.size() > 0) {
+            endKey[endKey.length - 1] = (byte) (endKey[endKey.length - 1] + 1);
+            return ByteSequence.from(endKey);
+        }
+        return ByteSequence.from(NO_PREFIX_END);
+    }
+
+    public static boolean isRecoverable(Status status) {
+        return isHaltError(status)
+                || isNoLeaderError(status)
+                // ephemeral is expired
+                || status.getCode() == Status.Code.NOT_FOUND;
+    }
+
+    public static boolean isHaltError(Status status) {
+        // Unavailable codes mean the system will be right back.
+        // (e.g., can't connect, lost leader)
+        // Treat Internal codes as if something failed, leaving the
+        // system in an inconsistent state, but retrying could make progress.
+        // (e.g., failed in middle of send, corrupted frame)
+        return status.getCode() != Status.Code.UNAVAILABLE && status.getCode() != Status.Code.INTERNAL;
+    }
+
+    public static boolean isNoLeaderError(Status status) {
+        return status.getCode() == Status.Code.UNAVAILABLE
+                && "etcdserver: no leader".equals(status.getDescription());
+    }
+
+    public static boolean isProtocolError(Throwable e) {
+        if (e == null) {
+            return false;
+        }
+        Throwable cause = e.getCause();
+        while (cause != null) {
+            if (cause instanceof Http2Exception) {
+                Http2Exception t = (Http2Exception) cause;
+                if ("PROTOCOL_ERROR".equals(t.error().name())) {
+                    return true;
+                }
+            }
+            cause = cause.getCause();
+        }
+        return false;
+    }
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/support/AbstractEtcdClient.java b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/support/AbstractEtcdClient.java
new file mode 100644
index 0000000..430bc54
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/support/AbstractEtcdClient.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.etcd.support;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.ConcurrentHashSet;
+import org.apache.dubbo.common.utils.StringUtils;
+import org.apache.dubbo.remoting.etcd.ChildListener;
+import org.apache.dubbo.remoting.etcd.EtcdClient;
+import org.apache.dubbo.remoting.etcd.StateListener;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import static org.apache.dubbo.common.constants.CommonConstants.PATH_SEPARATOR;
+import static org.apache.dubbo.remoting.etcd.Constants.CONFIGURATORS_CATEGORY;
+import static org.apache.dubbo.remoting.etcd.Constants.CONSUMERS_CATEGORY;
+import static org.apache.dubbo.remoting.etcd.Constants.PROVIDERS_CATEGORY;
+import static org.apache.dubbo.remoting.etcd.Constants.ROUTERS_CATEGORY;
+
+public abstract class AbstractEtcdClient<WatcherListener> implements EtcdClient {
+
+    protected static final Logger logger = LoggerFactory.getLogger(AbstractEtcdClient.class);
+
+    private final URL url;
+
+    private final Set<StateListener> stateListeners = new ConcurrentHashSet<>();
+
+    private final ConcurrentMap<String, ConcurrentMap<ChildListener, WatcherListener>> childListeners = new ConcurrentHashMap<>();
+    private final List<String> categories = Arrays.asList(PROVIDERS_CATEGORY, CONSUMERS_CATEGORY, ROUTERS_CATEGORY,
+            CONFIGURATORS_CATEGORY);
+    private volatile boolean closed = false;
+
+    public AbstractEtcdClient(URL url) {
+        this.url = url;
+    }
+
+    @Override
+    public URL getUrl() {
+        return url;
+    }
+
+    @Override
+    public void create(String path) {
+        String fixedPath = fixNamespace(path);
+        createParentIfAbsent(fixedPath);
+        doCreatePersistent(fixedPath);
+    }
+
+    @Override
+    public long createEphemeral(String path) {
+        String fixedPath = fixNamespace(path);
+        createParentIfAbsent(fixedPath);
+        return doCreateEphemeral(path);
+    }
+
+    @Override
+    public void addStateListener(StateListener listener) {
+        stateListeners.add(listener);
+    }
+
+    @Override
+    public void removeStateListener(StateListener listener) {
+        stateListeners.remove(listener);
+    }
+
+    public Set<StateListener> getSessionListeners() {
+        return stateListeners;
+    }
+
+    @Override
+    public List<String> addChildListener(String path, final ChildListener listener) {
+        ConcurrentMap<ChildListener, WatcherListener> listeners = childListeners.computeIfAbsent(path, k -> new ConcurrentHashMap<>());
+        WatcherListener targetListener = listeners.computeIfAbsent(listener, k -> createChildWatcherListener(path, k));
+        return addChildWatcherListener(path, targetListener);
+    }
+
+    @Override
+    public WatcherListener getChildListener(String path, ChildListener listener) {
+        ConcurrentMap<ChildListener, WatcherListener> listeners = childListeners.get(path);
+        if (listeners == null) {
+            return null;
+        }
+        return listeners.computeIfAbsent(listener, k -> createChildWatcherListener(path, k));
+    }
+
+    @Override
+    public void removeChildListener(String path, ChildListener listener) {
+        ConcurrentMap<ChildListener, WatcherListener> listeners = childListeners.get(path);
+        if (listeners != null) {
+            WatcherListener targetListener = listeners.remove(listener);
+            if (targetListener != null) {
+                removeChildWatcherListener(path, targetListener);
+            }
+        }
+    }
+
+    protected void stateChanged(int state) {
+        for (StateListener sessionListener : getSessionListeners()) {
+            sessionListener.stateChanged(state);
+        }
+    }
+
+    protected String fixNamespace(String path) {
+        if (StringUtils.isEmpty(path)) {
+            throw new IllegalArgumentException("path is required, actual null or ''");
+        }
+        return (path.charAt(0) != '/') ? (PATH_SEPARATOR + path) : path;
+    }
+
+    protected void createParentIfAbsent(String fixedPath) {
+        int i = fixedPath.lastIndexOf('/');
+        if (i > 0) {
+            String parentPath = fixedPath.substring(0, i);
+            if (categories.stream().anyMatch(c -> fixedPath.endsWith(c))) {
+                if (!checkExists(parentPath)) {
+                    this.doCreatePersistent(parentPath);
+                }
+            } else if (categories.stream().anyMatch(c -> parentPath.endsWith(c))) {
+                String grandfather = parentPath.substring(0, parentPath.lastIndexOf('/'));
+                if (!checkExists(grandfather)) {
+                    this.doCreatePersistent(grandfather);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void close() {
+        if (closed) {
+            return;
+        }
+        closed = true;
+        try {
+            doClose();
+        } catch (Throwable t) {
+            logger.warn(t.getMessage(), t);
+        }
+    }
+
+    public abstract void doClose();
+
+    public abstract void doCreatePersistent(String path);
+
+    public abstract long doCreateEphemeral(String path);
+
+    @Override
+    public abstract void delete(String path);
+
+    public abstract boolean checkExists(String path);
+
+    public abstract WatcherListener createChildWatcherListener(String path, ChildListener listener);
+
+    public abstract List<String> addChildWatcherListener(String path, WatcherListener listener);
+
+    public abstract void removeChildWatcherListener(String path, WatcherListener listener);
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.etcd.EtcdTransporter b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.etcd.EtcdTransporter
new file mode 100644
index 0000000..c6ce47a
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.etcd.EtcdTransporter
@@ -0,0 +1 @@
+jetcd=org.apache.dubbo.remoting.etcd.jetcd.JEtcdTransporter
diff --git a/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/test/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientTest.java b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/test/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientTest.java
new file mode 100644
index 0000000..15c1634
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/test/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientTest.java
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.etcd.jetcd;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.utils.ReflectUtils;
+import org.apache.dubbo.remoting.etcd.ChildListener;
+
+import com.google.protobuf.ByteString;
+import io.etcd.jetcd.ByteSequence;
+import io.etcd.jetcd.Client;
+import io.etcd.jetcd.Watch;
+import io.etcd.jetcd.api.Event;
+import io.etcd.jetcd.api.WatchCancelRequest;
+import io.etcd.jetcd.api.WatchCreateRequest;
+import io.etcd.jetcd.api.WatchGrpc;
+import io.etcd.jetcd.api.WatchRequest;
+import io.etcd.jetcd.api.WatchResponse;
+import io.etcd.jetcd.common.exception.ClosedClientException;
+import io.etcd.jetcd.watch.WatchEvent;
+import io.grpc.ManagedChannel;
+import io.grpc.Status;
+import io.grpc.stub.StreamObserver;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.dubbo.remoting.etcd.Constants.SESSION_TIMEOUT_KEY;
+
+@Disabled
+public class JEtcdClientTest {
+
+    JEtcdClient client;
+
+    @Test
+    public void test_watch_when_create_path() throws InterruptedException {
+
+        String path = "/dubbo/com.alibaba.dubbo.demo.DemoService/providers";
+        String child = "/dubbo/com.alibaba.dubbo.demo.DemoService/providers/demoService1";
+
+        final CountDownLatch notNotified = new CountDownLatch(1);
+
+        ChildListener childListener = (parent, children) -> {
+            Assertions.assertEquals(1, children.size());
+            Assertions.assertEquals(child.substring(child.lastIndexOf("/") + 1), children.get(0));
+            notNotified.countDown();
+        };
+
+        client.addChildListener(path, childListener);
+
+        client.createEphemeral(child);
+        Assertions.assertTrue(notNotified.await(10, TimeUnit.SECONDS));
+
+        client.removeChildListener(path, childListener);
+        client.delete(child);
+    }
+
+    @Test
+    public void test_watch_when_modify() {
+        String path = "/dubbo/config/jetcd-client-unit-test/configurators";
+        String endpoint = "http://127.0.0.1:2379";
+        CountDownLatch latch = new CountDownLatch(1);
+        ByteSequence key = ByteSequence.from(path, UTF_8);
+
+        Watch.Listener listener = Watch.listener(response -> {
+            for (WatchEvent event : response.getEvents()) {
+                Assertions.assertEquals("PUT", event.getEventType().toString());
+                Assertions.assertEquals(path, event.getKeyValue().getKey().toString(UTF_8));
+                Assertions.assertEquals("Hello", event.getKeyValue().getValue().toString(UTF_8));
+                latch.countDown();
+            }
+
+        });
+
+        try (Client client = Client.builder().endpoints(endpoint).build();
+             Watch watch = client.getWatchClient();
+             Watch.Watcher watcher = watch.watch(key, listener)) {
+            // try to modify the key
+            client.getKVClient().put(ByteSequence.from(path, UTF_8), ByteSequence.from("Hello", UTF_8));
+            latch.await();
+        } catch (Exception e) {
+            Assertions.fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testWatchWithGrpc() {
+        String path = "/dubbo/config/test_watch_with_grpc/configurators";
+        String endpoint = "http://127.0.0.1:2379";
+        CountDownLatch latch = new CountDownLatch(1);
+        try (Client client = Client.builder().endpoints(endpoint).build()) {
+            ManagedChannel channel = getChannel(client);
+            StreamObserver<WatchRequest> observer = WatchGrpc.newStub(channel).watch(new StreamObserver<WatchResponse>() {
+                @Override
+                public void onNext(WatchResponse response) {
+                    for (Event event : response.getEventsList()) {
+                        Assertions.assertEquals("PUT", event.getType().toString());
+                        Assertions.assertEquals(path, event.getKv().getKey().toString(UTF_8));
+                        Assertions.assertEquals("Hello", event.getKv().getValue().toString(UTF_8));
+                        latch.countDown();
+                    }
+                }
+
+                @Override
+                public void onError(Throwable throwable) {
+
+                }
+
+                @Override
+                public void onCompleted() {
+
+                }
+            });
+            WatchCreateRequest.Builder builder = WatchCreateRequest.newBuilder()
+                    .setKey(ByteString.copyFrom(path, UTF_8));
+
+            observer.onNext(WatchRequest.newBuilder().setCreateRequest(builder).build());
+
+            // try to modify the key
+            client.getKVClient().put(ByteSequence.from(path, UTF_8), ByteSequence.from("Hello", UTF_8));
+            latch.await(5, TimeUnit.SECONDS);
+        } catch (Exception e) {
+            Assertions.fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testCancelWatchWithGrpc() {
+        String path = "/dubbo/config/testCancelWatchWithGrpc/configurators";
+        String endpoint = "http://127.0.0.1:2379";
+        CountDownLatch updateLatch = new CountDownLatch(1);
+        CountDownLatch cancelLatch = new CountDownLatch(1);
+        final AtomicLong watchID = new AtomicLong(-1L);
+        try (Client client = Client.builder().endpoints(endpoint).build()) {
+            ManagedChannel channel = getChannel(client);
+            StreamObserver<WatchRequest> observer = WatchGrpc.newStub(channel).watch(new StreamObserver<WatchResponse>() {
+                @Override
+                public void onNext(WatchResponse response) {
+                    watchID.set(response.getWatchId());
+                    for (Event event : response.getEventsList()) {
+                        Assertions.assertEquals("PUT", event.getType().toString());
+                        Assertions.assertEquals(path, event.getKv().getKey().toString(UTF_8));
+                        Assertions.assertEquals("Hello", event.getKv().getValue().toString(UTF_8));
+                        updateLatch.countDown();
+                    }
+                    if (response.getCanceled()) {
+                        // received the cancel response
+                        cancelLatch.countDown();
+                    }
+                }
+
+                @Override
+                public void onError(Throwable throwable) {
+
+                }
+
+                @Override
+                public void onCompleted() {
+
+                }
+            });
+            // create
+            WatchCreateRequest.Builder builder = WatchCreateRequest.newBuilder()
+                    .setKey(ByteString.copyFrom(path, UTF_8));
+
+            // make the grpc call to watch the key
+            observer.onNext(WatchRequest.newBuilder().setCreateRequest(builder).build());
+
+            // try to put the value
+            client.getKVClient().put(ByteSequence.from(path, UTF_8), ByteSequence.from("Hello", UTF_8));
+
+            // response received, latch counts down to zero
+            updateLatch.await();
+
+            WatchCancelRequest watchCancelRequest =
+                    WatchCancelRequest.newBuilder().setWatchId(watchID.get()).build();
+            WatchRequest cancelRequest = WatchRequest.newBuilder()
+                    .setCancelRequest(watchCancelRequest).build();
+            observer.onNext(cancelRequest);
+
+            // try to put the value
+            client.getKVClient().put(ByteSequence.from(path, UTF_8), ByteSequence.from("Hello world", UTF_8));
+
+            cancelLatch.await();
+        } catch (Exception e) {
+            Assertions.fail(e.getMessage());
+        }
+
+    }
+
+    @Test
+    public void test_watch_when_create_wrong_path() throws InterruptedException {
+
+        String path = "/dubbo/com.alibaba.dubbo.demo.DemoService/providers";
+        String child = "/dubbo/com.alibaba.dubbo.demo.DemoService/routers/demoService1";
+
+        final CountDownLatch notNotified = new CountDownLatch(1);
+
+        ChildListener childListener = (parent, children) -> {
+            Assertions.assertEquals(1, children.size());
+            Assertions.assertEquals(child, children.get(0));
+            notNotified.countDown();
+        };
+
+        client.addChildListener(path, childListener);
+
+        client.createEphemeral(child);
+        Assertions.assertFalse(notNotified.await(1, TimeUnit.SECONDS));
+
+        client.removeChildListener(path, childListener);
+        client.delete(child);
+    }
+
+    @Test
+    public void test_watch_when_delete_path() throws InterruptedException {
+
+        String path = "/dubbo/com.alibaba.dubbo.demo.DemoService/providers";
+        String child = "/dubbo/com.alibaba.dubbo.demo.DemoService/providers/demoService1";
+
+        final CountDownLatch notNotified = new CountDownLatch(1);
+
+        ChildListener childListener = (parent, children) -> {
+            Assertions.assertEquals(0, children.size());
+            notNotified.countDown();
+        };
+
+        client.createEphemeral(child);
+
+        client.addChildListener(path, childListener);
+        client.delete(child);
+
+        Assertions.assertTrue(notNotified.await(10, TimeUnit.SECONDS));
+        client.removeChildListener(path, childListener);
+    }
+
+    @Test
+    public void test_watch_then_unwatch() throws InterruptedException {
+
+        String path = "/dubbo/com.alibaba.dubbo.demo.DemoService/providers";
+        String child = "/dubbo/com.alibaba.dubbo.demo.DemoService/providers/demoService2";
+
+        final CountDownLatch notNotified = new CountDownLatch(1);
+        final CountDownLatch notTwiceNotified = new CountDownLatch(2);
+
+        final Holder notified = new Holder();
+
+        ChildListener childListener = (parent, children) -> {
+            Assertions.assertEquals(1, children.size());
+            Assertions.assertEquals(child.substring(child.lastIndexOf("/") + 1), children.get(0));
+            notNotified.countDown();
+            notTwiceNotified.countDown();
+            notified.getAndIncrease();
+        };
+
+        client.addChildListener(path, childListener);
+
+        client.createEphemeral(child);
+        Assertions.assertTrue(notNotified.await(15, TimeUnit.SECONDS));
+
+        client.removeChildListener(path, childListener);
+        client.delete(child);
+
+        Assertions.assertFalse(notTwiceNotified.await(5, TimeUnit.SECONDS));
+        Assertions.assertEquals(1, notified.value);
+        client.delete(child);
+    }
+
+    @Test
+    public void test_watch_on_unrecoverable_connection() throws InterruptedException {
+
+        String path = "/dubbo/com.alibaba.dubbo.demo.DemoService/providers";
+        JEtcdClient.EtcdWatcher watcher = null;
+        try {
+            ChildListener childListener = (parent, children) -> {
+                Assertions.assertEquals(path, parent);
+            };
+            client.addChildListener(path, childListener);
+            watcher = client.getChildListener(path, childListener);
+            watcher.watchRequest.onError(Status.ABORTED.withDescription("connection error").asRuntimeException());
+
+            watcher.watchRequest.onNext(watcher.nextRequest());
+        } catch (Exception e) {
+            Assertions.assertTrue(e.getMessage().contains("call was cancelled"));
+        }
+    }
+
+    @Test
+    public void test_watch_on_recoverable_connection() throws InterruptedException {
+
+        String path = "/dubbo/com.alibaba.dubbo.demo.DemoService/connection";
+        String child = "/dubbo/com.alibaba.dubbo.demo.DemoService/connection/demoService1";
+
+        final CountDownLatch notNotified = new CountDownLatch(1);
+        final CountDownLatch notTwiceNotified = new CountDownLatch(2);
+        final Holder notified = new Holder();
+        ChildListener childListener = (parent, children) -> {
+            notTwiceNotified.countDown();
+            switch (notified.increaseAndGet()) {
+                case 1: {
+                    notNotified.countDown();
+                    Assertions.assertEquals(1, children.size());
+                    Assertions.assertEquals(child.substring(child.lastIndexOf("/") + 1), children.get(0));
+                    break;
+                }
+                case 2: {
+                    Assertions.assertEquals(0, children.size());
+                    Assertions.assertEquals(path, parent);
+                    break;
+                }
+                default:
+                    Assertions.fail("two many callback invoked.");
+            }
+        };
+
+        client.addChildListener(path, childListener);
+        client.createEphemeral(child);
+
+        // make sure first time callback successfully
+        Assertions.assertTrue(notNotified.await(15, TimeUnit.SECONDS));
+
+        // connection error causes client to release all resources including current watcher
+        JEtcdClient.EtcdWatcher watcher = client.getChildListener(path, childListener);
+        watcher.onError(Status.UNAVAILABLE.withDescription("temporary connection issue").asRuntimeException());
+
+        // trigger delete after unavailable
+        client.delete(child);
+        Assertions.assertTrue(notTwiceNotified.await(15, TimeUnit.SECONDS));
+
+        client.removeChildListener(path, childListener);
+    }
+
+    @Test
+    public void test_watch_after_client_closed() throws InterruptedException {
+
+        String path = "/dubbo/com.alibaba.dubbo.demo.DemoService/providers";
+        client.close();
+
+        try {
+            client.addChildListener(path, (parent, children) -> {
+                Assertions.assertEquals(path, parent);
+            });
+        } catch (ClosedClientException e) {
+            Assertions.assertEquals("watch client has been closed, path '" + path + "'", e.getMessage());
+        }
+    }
+
+    @BeforeEach
+    public void setUp() {
+        // timeout in 15 seconds.
+        URL url = URL.valueOf("etcd3://127.0.0.1:2379/com.alibaba.dubbo.registry.RegistryService")
+                .addParameter(SESSION_TIMEOUT_KEY, 15000);
+
+        client = new JEtcdClient(url);
+    }
+
+    @AfterEach
+    public void tearDown() {
+        client.close();
+    }
+
+    static class Holder {
+
+        volatile int value;
+
+        synchronized int getAndIncrease() {
+            return value++;
+        }
+
+        synchronized int increaseAndGet() {
+            return ++value;
+        }
+    }
+
+    private ManagedChannel getChannel(Client client) {
+        try {
+            // hack, use reflection to get the shared channel.
+            Field connectionField = client.getClass().getDeclaredField("connectionManager");
+            connectionField.setAccessible(true);
+            Object connection = connectionField.get(client);
+            Method channelMethod = connection.getClass().getDeclaredMethod("getChannel");
+            ReflectUtils.makeAccessible(channelMethod);
+            ManagedChannel channel = (ManagedChannel) channelMethod.invoke(connection);
+            return channel;
+        } catch (Exception e) {
+            return null;
+        }
+    }
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/test/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapperTest.java b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/test/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapperTest.java
new file mode 100644
index 0000000..337b6de
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/test/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapperTest.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.etcd.jetcd;
+
+import org.apache.dubbo.common.URL;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.LockSupport;
+
+import static org.apache.dubbo.remoting.etcd.Constants.SESSION_TIMEOUT_KEY;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+
+@Disabled
+public class JEtcdClientWrapperTest {
+
+    JEtcdClientWrapper clientWrapper;
+
+    @Test
+    public void test_path_exists() {
+        String path = "/dubbo/org.apache.dubbo.demo.DemoService/providers";
+        clientWrapper.createPersistent(path);
+        Assertions.assertTrue(clientWrapper.checkExists(path));
+        Assertions.assertFalse(clientWrapper.checkExists(path + "/noneexits"));
+        clientWrapper.delete(path);
+    }
+
+    @Test
+    public void test_create_emerphal_path() {
+        String path = "/dubbo/org.apache.dubbo.demo.DemoService/providers";
+        clientWrapper.createEphemeral(path);
+        Assertions.assertTrue(clientWrapper.checkExists(path));
+        clientWrapper.delete(path);
+    }
+
+    @Test
+    public void test_grant_lease_then_revoke() {
+        long lease = clientWrapper.createLease(1);
+        clientWrapper.revokeLease(lease);
+
+        long newLease = clientWrapper.createLease(1);
+        LockSupport.parkNanos(this, TimeUnit.SECONDS.toNanos(2));
+        // test timeout of lease
+        clientWrapper.revokeLease(newLease);
+    }
+
+    @Test
+    public void test_create_emerphal_path_then_timeout() {
+        String path = "/dubbo/org.apache.dubbo.demo.DemoService/providers";
+
+        URL url = URL.valueOf("etcd3://127.0.0.1:2379/org.apache.dubbo.registry.RegistryService")
+                .addParameter(SESSION_TIMEOUT_KEY, 1000);
+
+        JEtcdClientWrapper saved = clientWrapper;
+
+        try {
+            clientWrapper = spy(new JEtcdClientWrapper(url));
+            clientWrapper.start();
+
+            doAnswer(new Answer() {
+                int timeout;
+
+                @Override
+                public Object answer(InvocationOnMock invocation) throws Throwable {
+                    LockSupport.parkNanos(this, TimeUnit.SECONDS.toNanos(2));
+                    if (timeout++ > 0) {
+                        throw new TimeoutException();
+                    }
+                    return null;
+                }
+            }).when(clientWrapper).keepAlive(anyLong());
+
+            try {
+                clientWrapper.createEphemeral(path);
+            } catch (IllegalStateException ex) {
+                Assertions.assertEquals("failed to create ephereral by path '" + path + "'", ex.getMessage());
+            }
+
+        } finally {
+            clientWrapper.doClose();
+            clientWrapper = saved;
+        }
+    }
+
+    @Test
+    public void test_get_emerphal_children_path() {
+        String path = "/dubbo/org.apache.dubbo.demo.DemoService/providers";
+        String[] children = {
+                "/dubbo/org.apache.dubbo.demo.DemoService/providers/service1"
+                , "/dubbo/org.apache.dubbo.demo.DemoService/providers/service2"
+                , "/dubbo/org.apache.dubbo.demo.DemoService/providers/service3"
+                , "/dubbo/org.apache.dubbo.demo.DemoService/providers/service4"
+                , "/dubbo/org.apache.dubbo.demo.DemoService/providers/service5/exclude"
+        };
+
+        Arrays.stream(children).forEach((child) -> {
+            Assertions.assertFalse(clientWrapper.checkExists(child));
+            clientWrapper.createEphemeral(child);
+        });
+
+        List<String> extected = clientWrapper.getChildren(path);
+
+        Assertions.assertEquals(4, extected.size());
+        extected.stream().forEach((child) -> {
+            boolean found = false;
+            for (int i = 0; i < children.length; ++i) {
+                if (child.equals(children[i])) {
+                    found = true;
+                    break;
+                }
+            }
+            Assertions.assertTrue(found);
+            clientWrapper.delete(child);
+        });
+    }
+
+    @Test
+    public void test_connect_cluster() {
+        URL url = URL.valueOf("etcd3://127.0.0.1:22379/org.apache.dubbo.registry.RegistryService?backup=127.0.0.1:2379,127.0.0.1:32379");
+        JEtcdClientWrapper clientWrapper = new JEtcdClientWrapper(url);
+        try {
+            clientWrapper.start();
+            String path = "/dubbo/org.apache.dubbo.demo.DemoService/providers";
+            clientWrapper.createEphemeral(path);
+            Assertions.assertTrue(clientWrapper.checkExists(path));
+            Assertions.assertFalse(clientWrapper.checkExists(path + "/noneexits"));
+            clientWrapper.delete(path);
+        } finally {
+            clientWrapper.doClose();
+        }
+    }
+
+    @BeforeEach
+    public void setUp() {
+        URL url = URL.valueOf("etcd3://127.0.0.1:2379/org.apache.dubbo.registry.RegistryService");
+        clientWrapper = new JEtcdClientWrapper(url);
+        clientWrapper.start();
+    }
+
+    @AfterEach
+    public void tearDown() {
+        clientWrapper.doClose();
+    }
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/test/java/org/apache/dubbo/remoting/etcd/jetcd/LeaseTest.java b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/test/java/org/apache/dubbo/remoting/etcd/jetcd/LeaseTest.java
new file mode 100644
index 0000000..6188984
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/test/java/org/apache/dubbo/remoting/etcd/jetcd/LeaseTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.etcd.jetcd;
+
+import com.google.common.base.Charsets;
+import io.etcd.jetcd.ByteSequence;
+import io.etcd.jetcd.Client;
+import io.etcd.jetcd.KV;
+import io.etcd.jetcd.Lease;
+import io.etcd.jetcd.launcher.EtcdCluster;
+import io.etcd.jetcd.launcher.EtcdClusterFactory;
+import io.etcd.jetcd.lease.LeaseKeepAliveResponse;
+import io.etcd.jetcd.options.PutOption;
+import io.etcd.jetcd.support.CloseableClient;
+import io.etcd.jetcd.support.Observers;
+import io.grpc.stub.StreamObserver;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * @author cvictory ON 2019-08-16
+ */
+@Disabled
+public class LeaseTest {
+
+    private static EtcdCluster cluster;
+
+    private KV kvClient;
+    private Client client;
+    private Lease leaseClient;
+
+    private static final ByteSequence KEY = ByteSequence.from("foo", Charsets.UTF_8);
+    private static final ByteSequence KEY_2 = ByteSequence.from("foo2", Charsets.UTF_8);
+    private static final ByteSequence VALUE = ByteSequence.from("bar", Charsets.UTF_8);
+
+    @BeforeAll
+    public static void beforeClass() {
+        cluster = EtcdClusterFactory.buildCluster("etcd-lease", 3, false);
+        cluster.start();
+    }
+
+    @AfterAll
+    public static void afterClass() {
+        cluster.close();
+    }
+
+    @BeforeEach
+    public void setUp() {
+        client = Client.builder().endpoints(cluster.getClientEndpoints()).build();
+        kvClient = client.getKVClient();
+        leaseClient = client.getLeaseClient();
+    }
+
+    @AfterEach
+    public void tearDown() {
+        if (client != null) {
+            client.close();
+        }
+
+    }
+
+    @Test
+    public void testGrant() throws Exception {
+        long leaseID = leaseClient.grant(5).get().getID();
+
+        kvClient.put(KEY, VALUE, PutOption.newBuilder().withLeaseId(leaseID).build()).get();
+        assertThat(kvClient.get(KEY).get().getCount()).isEqualTo(1);
+
+        Thread.sleep(6000);
+        assertThat(kvClient.get(KEY).get().getCount()).isEqualTo(0);
+    }
+
+    @Test
+    public void testRevoke() throws Exception {
+        long leaseID = leaseClient.grant(5).get().getID();
+        kvClient.put(KEY, VALUE, PutOption.newBuilder().withLeaseId(leaseID).build()).get();
+        assertThat(kvClient.get(KEY).get().getCount()).isEqualTo(1);
+        leaseClient.revoke(leaseID).get();
+        assertThat(kvClient.get(KEY).get().getCount()).isEqualTo(0);
+    }
+
+    @Test
+    public void testKeepAliveOnce() throws ExecutionException, InterruptedException {
+        long leaseID = leaseClient.grant(2).get().getID();
+        kvClient.put(KEY, VALUE, PutOption.newBuilder().withLeaseId(leaseID).build()).get();
+        assertThat(kvClient.get(KEY).get().getCount()).isEqualTo(1);
+        LeaseKeepAliveResponse rp = leaseClient.keepAliveOnce(leaseID).get();
+        assertThat(rp.getTTL()).isGreaterThan(0);
+    }
+
+    @Test
+    public void testKeepAlive() throws ExecutionException, InterruptedException {
+        long leaseID = leaseClient.grant(2).get().getID();
+        kvClient.put(KEY, VALUE, PutOption.newBuilder().withLeaseId(leaseID).build()).get();
+        assertThat(kvClient.get(KEY).get().getCount()).isEqualTo(1);
+
+        CountDownLatch latch = new CountDownLatch(1);
+        AtomicReference<LeaseKeepAliveResponse> responseRef = new AtomicReference<>();
+        StreamObserver<LeaseKeepAliveResponse> observer = Observers.observer(response -> {
+            responseRef.set(response);
+            latch.countDown();
+        });
+
+        try (CloseableClient c = leaseClient.keepAlive(leaseID, observer)) {
+            latch.await(5, TimeUnit.SECONDS);
+            LeaseKeepAliveResponse response = responseRef.get();
+            assertThat(response.getTTL()).isGreaterThan(0);
+        }
+
+        Thread.sleep(3000);
+        assertThat(kvClient.get(KEY).get().getCount()).isEqualTo(0);
+    }
+
+}
diff --git a/dubbo-remoting-extensions/pom.xml b/dubbo-remoting-extensions/dubbo-remoting-grizzly/pom.xml
similarity index 54%
copy from dubbo-remoting-extensions/pom.xml
copy to dubbo-remoting-extensions/dubbo-remoting-grizzly/pom.xml
index 6a6b72a..ce69f0e 100644
--- a/dubbo-remoting-extensions/pom.xml
+++ b/dubbo-remoting-extensions/dubbo-remoting-grizzly/pom.xml
@@ -1,4 +1,3 @@
-<?xml version="1.0" encoding="UTF-8"?>
 <!--
   Licensed to the Apache Software Foundation (ASF) under one or more
   contributor license agreements.  See the NOTICE file distributed with
@@ -15,24 +14,29 @@
   See the License for the specific language governing permissions and
   limitations under the License.
   -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
     <parent>
+        <artifactId>dubbo-remoting-extensions</artifactId>
         <groupId>org.apache.dubbo.extensions</groupId>
-        <artifactId>extensions-parent</artifactId>
         <version>${revision}</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
-    <modelVersion>4.0.0</modelVersion>
-
-    <artifactId>dubbo-remoting-extensions</artifactId>
-    <version>${revision}</version>
-    <packaging>pom</packaging>
-
-    <modules>
-        <module>dubbo-remoting-quic</module>
-    </modules>
 
+    <modelVersion>4.0.0</modelVersion>
+    <version>1.0.0-SNAPSHOT</version>
+    <artifactId>dubbo-remoting-grizzly</artifactId>
+    <packaging>jar</packaging>
+    <name>${project.artifactId}</name>
+    <description>The grizzly remoting module of dubbo project</description>
 
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.dubbo</groupId>
+            <artifactId>dubbo-remoting-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.glassfish.grizzly</groupId>
+            <artifactId>grizzly-core</artifactId>
+        </dependency>
+    </dependencies>
 </project>
diff --git a/dubbo-remoting-extensions/dubbo-remoting-grizzly/src/main/java/org/apache/dubbo/remoting/transport/grizzly/GrizzlyChannel.java b/dubbo-remoting-extensions/dubbo-remoting-grizzly/src/main/java/org/apache/dubbo/remoting/transport/grizzly/GrizzlyChannel.java
new file mode 100644
index 0000000..b8f1736
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-grizzly/src/main/java/org/apache/dubbo/remoting/transport/grizzly/GrizzlyChannel.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.transport.grizzly;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.remoting.ChannelHandler;
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.transport.AbstractChannel;
+
+import org.glassfish.grizzly.Connection;
+import org.glassfish.grizzly.Grizzly;
+import org.glassfish.grizzly.GrizzlyFuture;
+import org.glassfish.grizzly.attributes.Attribute;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT;
+import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
+
+/**
+ * GrizzlyChannel
+ *
+ *
+ */
+final class GrizzlyChannel extends AbstractChannel {
+
+    private static final Logger logger = LoggerFactory.getLogger(GrizzlyChannel.class);
+
+    private static final String CHANNEL_KEY = GrizzlyChannel.class.getName() + ".CHANNEL";
+
+    private static final Attribute<GrizzlyChannel> ATTRIBUTE = Grizzly.DEFAULT_ATTRIBUTE_BUILDER.createAttribute(CHANNEL_KEY);
+
+    private final Connection<?> connection;
+
+    /**
+     * @param connection
+     * @param url
+     * @param handler
+     */
+    private GrizzlyChannel(Connection<?> connection, URL url, ChannelHandler handler) {
+        super(url, handler);
+        if (connection == null) {
+            throw new IllegalArgumentException("grizzly connection == null");
+        }
+        this.connection = connection;
+    }
+
+    static GrizzlyChannel getOrAddChannel(Connection<?> connection, URL url, ChannelHandler handler) {
+        if (connection == null) {
+            return null;
+        }
+        GrizzlyChannel ret = ATTRIBUTE.get(connection);
+        if (ret == null) {
+            ret = new GrizzlyChannel(connection, url, handler);
+            if (connection.isOpen()) {
+                ATTRIBUTE.set(connection, ret);
+            }
+        }
+        return ret;
+    }
+
+    static void removeChannelIfDisconnected(Connection<?> connection) {
+        if (connection != null && !connection.isOpen()) {
+            ATTRIBUTE.remove(connection);
+        }
+    }
+
+    @Override
+    public InetSocketAddress getRemoteAddress() {
+        return (InetSocketAddress) connection.getPeerAddress();
+    }
+
+    @Override
+    public boolean isConnected() {
+        return connection.isOpen();
+    }
+
+    @Override
+    public InetSocketAddress getLocalAddress() {
+        return (InetSocketAddress) connection.getLocalAddress();
+    }
+
+    @Override
+    @SuppressWarnings("rawtypes")
+    public void send(Object message, boolean sent) throws RemotingException {
+        super.send(message, sent);
+
+        int timeout = 0;
+        try {
+            GrizzlyFuture future = connection.write(message);
+            if (sent) {
+                timeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
+                future.get(timeout, TimeUnit.MILLISECONDS);
+            }
+        } catch (TimeoutException e) {
+            throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
+                    + "in timeout(" + timeout + "ms) limit", e);
+        } catch (Throwable e) {
+            throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public void close() {
+        try {
+            super.close();
+        } catch (Exception e) {
+            logger.warn(e.getMessage(), e);
+        }
+        try {
+            removeChannelIfDisconnected(connection);
+        } catch (Exception e) {
+            logger.warn(e.getMessage(), e);
+        }
+        try {
+            if (logger.isInfoEnabled()) {
+                logger.info("Close grizzly channel " + connection);
+            }
+            connection.close();
+        } catch (Exception e) {
+            logger.warn(e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public boolean hasAttribute(String key) {
+        return getAttribute(key) == null;
+    }
+
+    @Override
+    public Object getAttribute(String key) {
+        return Grizzly.DEFAULT_ATTRIBUTE_BUILDER.createAttribute(key).get(connection);
+    }
+
+    @Override
+    public void setAttribute(String key, Object value) {
+        Grizzly.DEFAULT_ATTRIBUTE_BUILDER.createAttribute(key).set(connection, value);
+    }
+
+    @Override
+    public void removeAttribute(String key) {
+        Grizzly.DEFAULT_ATTRIBUTE_BUILDER.createAttribute(key).remove(connection);
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((connection == null) ? 0 : connection.hashCode());
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (getClass() != obj.getClass()) {
+            return false;
+        }
+        GrizzlyChannel other = (GrizzlyChannel) obj;
+        if (connection == null) {
+            if (other.connection != null) {
+                return false;
+            }
+        } else if (!connection.equals(other.connection)) {
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public String toString() {
+        return "GrizzlyChannel [connection=" + connection + "]";
+    }
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-grizzly/src/main/java/org/apache/dubbo/remoting/transport/grizzly/GrizzlyClient.java b/dubbo-remoting-extensions/dubbo-remoting-grizzly/src/main/java/org/apache/dubbo/remoting/transport/grizzly/GrizzlyClient.java
new file mode 100644
index 0000000..9c290c7
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-grizzly/src/main/java/org/apache/dubbo/remoting/transport/grizzly/GrizzlyClient.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.transport.grizzly;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.remoting.Channel;
+import org.apache.dubbo.remoting.ChannelHandler;
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.transport.AbstractClient;
+
+import org.glassfish.grizzly.Connection;
+import org.glassfish.grizzly.filterchain.FilterChainBuilder;
+import org.glassfish.grizzly.filterchain.TransportFilter;
+import org.glassfish.grizzly.nio.transport.TCPNIOTransport;
+import org.glassfish.grizzly.nio.transport.TCPNIOTransportBuilder;
+import org.glassfish.grizzly.strategies.SameThreadIOStrategy;
+import org.glassfish.grizzly.threadpool.ThreadPoolConfig;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT;
+import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
+
+/**
+ * GrizzlyClient
+ *
+ *
+ */
+public class GrizzlyClient extends AbstractClient {
+
+    private static final Logger logger = LoggerFactory.getLogger(GrizzlyClient.class);
+
+    private TCPNIOTransport transport;
+
+    private volatile Connection<?> connection; // volatile, please copy reference to use
+
+    public GrizzlyClient(URL url, ChannelHandler handler) throws RemotingException {
+        super(url, handler);
+    }
+
+    @Override
+    protected void doOpen() throws Throwable {
+        FilterChainBuilder filterChainBuilder = FilterChainBuilder.stateless();
+        filterChainBuilder.add(new TransportFilter());
+        filterChainBuilder.add(new GrizzlyCodecAdapter(getCodec(), getUrl(), this));
+        filterChainBuilder.add(new GrizzlyHandler(getUrl(), this));
+        TCPNIOTransportBuilder builder = TCPNIOTransportBuilder.newInstance();
+        ThreadPoolConfig config = builder.getWorkerThreadPoolConfig();
+        config.setPoolName(CLIENT_THREAD_POOL_NAME)
+                .setQueueLimit(-1)
+                .setCorePoolSize(0)
+                .setMaxPoolSize(Integer.MAX_VALUE)
+                .setKeepAliveTime(60L, TimeUnit.SECONDS);
+        builder.setTcpNoDelay(true).setKeepAlive(true)
+                .setConnectionTimeout(getConnectTimeout())
+                .setIOStrategy(SameThreadIOStrategy.getInstance());
+        transport = builder.build();
+        transport.setProcessor(filterChainBuilder.build());
+        transport.start();
+    }
+
+
+    @Override
+    protected void doConnect() throws Throwable {
+        connection = transport.connect(getConnectAddress())
+                .get(getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT), TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    protected void doDisConnect() throws Throwable {
+        try {
+            GrizzlyChannel.removeChannelIfDisconnected(connection);
+        } catch (Throwable t) {
+            logger.warn(t.getMessage());
+        }
+    }
+
+    @Override
+    protected void doClose() throws Throwable {
+        try {
+            transport.stop();
+        } catch (Throwable e) {
+            logger.warn(e.getMessage(), e);
+        }
+    }
+
+    @Override
+    protected Channel getChannel() {
+        Connection<?> c = connection;
+        if (c == null || !c.isOpen()) {
+            return null;
+        }
+        return GrizzlyChannel.getOrAddChannel(c, getUrl(), this);
+    }
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-grizzly/src/main/java/org/apache/dubbo/remoting/transport/grizzly/GrizzlyCodecAdapter.java b/dubbo-remoting-extensions/dubbo-remoting-grizzly/src/main/java/org/apache/dubbo/remoting/transport/grizzly/GrizzlyCodecAdapter.java
new file mode 100644
index 0000000..898d3ce
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-grizzly/src/main/java/org/apache/dubbo/remoting/transport/grizzly/GrizzlyCodecAdapter.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.transport.grizzly;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.remoting.Channel;
+import org.apache.dubbo.remoting.ChannelHandler;
+import org.apache.dubbo.remoting.Codec2;
+import org.apache.dubbo.remoting.buffer.ChannelBuffer;
+import org.apache.dubbo.remoting.buffer.ChannelBuffers;
+import org.apache.dubbo.remoting.buffer.DynamicChannelBuffer;
+
+import org.glassfish.grizzly.Buffer;
+import org.glassfish.grizzly.Connection;
+import org.glassfish.grizzly.filterchain.BaseFilter;
+import org.glassfish.grizzly.filterchain.FilterChainContext;
+import org.glassfish.grizzly.filterchain.NextAction;
+
+import java.io.IOException;
+
+import static org.apache.dubbo.remoting.Constants.BUFFER_KEY;
+import static org.apache.dubbo.remoting.Constants.DEFAULT_BUFFER_SIZE;
+import static org.apache.dubbo.remoting.Constants.MAX_BUFFER_SIZE;
+import static org.apache.dubbo.remoting.Constants.MIN_BUFFER_SIZE;
+
+/**
+ * GrizzlyCodecAdapter
+ */
+public class GrizzlyCodecAdapter extends BaseFilter {
+
+    private final Codec2 codec;
+
+    private final URL url;
+
+    private final ChannelHandler handler;
+
+    private final int bufferSize;
+
+    private ChannelBuffer previousData = ChannelBuffers.EMPTY_BUFFER;
+
+    public GrizzlyCodecAdapter(Codec2 codec, URL url, ChannelHandler handler) {
+        this.codec = codec;
+        this.url = url;
+        this.handler = handler;
+        int b = url.getPositiveParameter(BUFFER_KEY, DEFAULT_BUFFER_SIZE);
+        this.bufferSize = b >= MIN_BUFFER_SIZE && b <= MAX_BUFFER_SIZE ? b : DEFAULT_BUFFER_SIZE;
+    }
+
+    @Override
+    public NextAction handleWrite(FilterChainContext context) throws IOException {
+        Connection<?> connection = context.getConnection();
+        GrizzlyChannel channel = GrizzlyChannel.getOrAddChannel(connection, url, handler);
+        try {
+            ChannelBuffer channelBuffer = ChannelBuffers.dynamicBuffer(1024); // Do not need to close
+
+            Object msg = context.getMessage();
+            codec.encode(channel, channelBuffer, msg);
+
+            GrizzlyChannel.removeChannelIfDisconnected(connection);
+            Buffer buffer = connection.getTransport().getMemoryManager().allocate(channelBuffer.readableBytes());
+            buffer.put(channelBuffer.toByteBuffer());
+            buffer.flip();
+            buffer.allowBufferDispose(true);
+            context.setMessage(buffer);
+        } finally {
+            GrizzlyChannel.removeChannelIfDisconnected(connection);
+        }
+        return context.getInvokeAction();
+    }
+
+    @Override
+    public NextAction handleRead(FilterChainContext context) throws IOException {
+        Object message = context.getMessage();
+        Connection<?> connection = context.getConnection();
+        Channel channel = GrizzlyChannel.getOrAddChannel(connection, url, handler);
+        try {
+            if (message instanceof Buffer) { // receive a new packet
+                Buffer grizzlyBuffer = (Buffer) message; // buffer
+
+                ChannelBuffer frame;
+
+                if (previousData.readable()) {
+                    if (previousData instanceof DynamicChannelBuffer) {
+                        previousData.writeBytes(grizzlyBuffer.toByteBuffer());
+                        frame = previousData;
+                    } else {
+                        int size = previousData.readableBytes() + grizzlyBuffer.remaining();
+                        frame = ChannelBuffers.dynamicBuffer(size > bufferSize ? size : bufferSize);
+                        frame.writeBytes(previousData, previousData.readableBytes());
+                        frame.writeBytes(grizzlyBuffer.toByteBuffer());
+                    }
+                } else {
+                    frame = ChannelBuffers.wrappedBuffer(grizzlyBuffer.toByteBuffer());
+                }
+
+                Object msg;
+                int savedReadIndex;
+
+                do {
+                    savedReadIndex = frame.readerIndex();
+                    try {
+                        msg = codec.decode(channel, frame);
+                    } catch (Exception e) {
+                        previousData = ChannelBuffers.EMPTY_BUFFER;
+                        throw new IOException(e.getMessage(), e);
+                    }
+                    if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
+                        frame.readerIndex(savedReadIndex);
+                        return context.getStopAction();
+                    } else {
+                        if (savedReadIndex == frame.readerIndex()) {
+                            previousData = ChannelBuffers.EMPTY_BUFFER;
+                            throw new IOException("Decode without read data.");
+                        }
+                        if (msg != null) {
+                            context.setMessage(msg);
+                        }
+                        return context.getInvokeAction();
+                    }
+                } while (frame.readable());
+            } else { // Other events are passed down directly
+                return context.getInvokeAction();
+            }
+        } finally {
+            GrizzlyChannel.removeChannelIfDisconnected(connection);
+        }
+    }
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-grizzly/src/main/java/org/apache/dubbo/remoting/transport/grizzly/GrizzlyHandler.java b/dubbo-remoting-extensions/dubbo-remoting-grizzly/src/main/java/org/apache/dubbo/remoting/transport/grizzly/GrizzlyHandler.java
new file mode 100644
index 0000000..f162911
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-grizzly/src/main/java/org/apache/dubbo/remoting/transport/grizzly/GrizzlyHandler.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.transport.grizzly;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.StringUtils;
+import org.apache.dubbo.remoting.ChannelHandler;
+import org.apache.dubbo.remoting.RemotingException;
+
+import org.glassfish.grizzly.Connection;
+import org.glassfish.grizzly.filterchain.BaseFilter;
+import org.glassfish.grizzly.filterchain.FilterChainContext;
+import org.glassfish.grizzly.filterchain.NextAction;
+
+import java.io.IOException;
+
+/**
+ * GrizzlyHandler
+ */
+public class GrizzlyHandler extends BaseFilter {
+
+    private static final Logger logger = LoggerFactory.getLogger(GrizzlyHandler.class);
+
+    private final URL url;
+
+    private final ChannelHandler handler;
+
+    public GrizzlyHandler(URL url, ChannelHandler handler) {
+        this.url = url;
+        this.handler = handler;
+    }
+
+    @Override
+    public NextAction handleConnect(FilterChainContext ctx) throws IOException {
+        Connection<?> connection = ctx.getConnection();
+        GrizzlyChannel channel = GrizzlyChannel.getOrAddChannel(connection, url, handler);
+        try {
+            handler.connected(channel);
+        } catch (RemotingException e) {
+            throw new IOException(StringUtils.toString(e));
+        } finally {
+            GrizzlyChannel.removeChannelIfDisconnected(connection);
+        }
+        return ctx.getInvokeAction();
+    }
+
+    @Override
+    public NextAction handleClose(FilterChainContext ctx) throws IOException {
+        Connection<?> connection = ctx.getConnection();
+        GrizzlyChannel channel = GrizzlyChannel.getOrAddChannel(connection, url, handler);
+        try {
+            handler.disconnected(channel);
+        } catch (RemotingException e) {
+            throw new IOException(StringUtils.toString(e));
+        } finally {
+            GrizzlyChannel.removeChannelIfDisconnected(connection);
+        }
+        return ctx.getInvokeAction();
+    }
+
+    @Override
+    public NextAction handleRead(FilterChainContext ctx) throws IOException {
+        Connection<?> connection = ctx.getConnection();
+        GrizzlyChannel channel = GrizzlyChannel.getOrAddChannel(connection, url, handler);
+        try {
+            handler.received(channel, ctx.getMessage());
+        } catch (RemotingException e) {
+            throw new IOException(StringUtils.toString(e));
+        } finally {
+            GrizzlyChannel.removeChannelIfDisconnected(connection);
+        }
+        return ctx.getInvokeAction();
+    }
+
+    @Override
+    public NextAction handleWrite(FilterChainContext ctx) throws IOException {
+        Connection<?> connection = ctx.getConnection();
+        GrizzlyChannel channel = GrizzlyChannel.getOrAddChannel(connection, url, handler);
+        try {
+            handler.sent(channel, ctx.getMessage());
+        } catch (RemotingException e) {
+            throw new IOException(StringUtils.toString(e));
+        } finally {
+            GrizzlyChannel.removeChannelIfDisconnected(connection);
+        }
+        return ctx.getInvokeAction();
+    }
+
+    @Override
+    public void exceptionOccurred(FilterChainContext ctx, Throwable error) {
+        Connection<?> connection = ctx.getConnection();
+        GrizzlyChannel channel = GrizzlyChannel.getOrAddChannel(connection, url, handler);
+        try {
+            handler.caught(channel, error);
+        } catch (RemotingException e) {
+            logger.error("RemotingException on channel " + channel, e);
+        } finally {
+            GrizzlyChannel.removeChannelIfDisconnected(connection);
+        }
+    }
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-grizzly/src/main/java/org/apache/dubbo/remoting/transport/grizzly/GrizzlyServer.java b/dubbo-remoting-extensions/dubbo-remoting-grizzly/src/main/java/org/apache/dubbo/remoting/transport/grizzly/GrizzlyServer.java
new file mode 100644
index 0000000..b323841
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-grizzly/src/main/java/org/apache/dubbo/remoting/transport/grizzly/GrizzlyServer.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.transport.grizzly;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.remoting.Channel;
+import org.apache.dubbo.remoting.ChannelHandler;
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.transport.AbstractServer;
+
+import org.glassfish.grizzly.filterchain.FilterChainBuilder;
+import org.glassfish.grizzly.filterchain.TransportFilter;
+import org.glassfish.grizzly.nio.transport.TCPNIOTransport;
+import org.glassfish.grizzly.nio.transport.TCPNIOTransportBuilder;
+import org.glassfish.grizzly.strategies.SameThreadIOStrategy;
+import org.glassfish.grizzly.threadpool.ThreadPoolConfig;
+
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_THREADPOOL;
+import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_THREADS;
+import static org.apache.dubbo.common.constants.CommonConstants.THREADPOOL_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.THREADS_KEY;
+
+/**
+ * GrizzlyServer
+ */
+public class GrizzlyServer extends AbstractServer {
+
+    private static final Logger logger = LoggerFactory.getLogger(GrizzlyServer.class);
+
+    private final Map<String, Channel> channels = new ConcurrentHashMap<String, Channel>(); // <ip:port, channel>
+
+    private TCPNIOTransport transport;
+
+    public GrizzlyServer(URL url, ChannelHandler handler) throws RemotingException {
+        super(url, handler);
+    }
+
+    @Override
+    protected void doOpen() throws Throwable {
+        FilterChainBuilder filterChainBuilder = FilterChainBuilder.stateless();
+        filterChainBuilder.add(new TransportFilter());
+        filterChainBuilder.add(new GrizzlyCodecAdapter(getCodec(), getUrl(), this));
+        filterChainBuilder.add(new GrizzlyHandler(getUrl(), this));
+
+        TCPNIOTransportBuilder builder = TCPNIOTransportBuilder.newInstance();
+        ThreadPoolConfig config = ThreadPoolConfig.defaultConfig();
+        config.setPoolName(SERVER_THREAD_POOL_NAME).setQueueLimit(-1);
+        String threadpool = getUrl().getParameter(THREADPOOL_KEY, DEFAULT_THREADPOOL);
+        if (DEFAULT_THREADPOOL.equals(threadpool)) {
+            int threads = getUrl().getPositiveParameter(THREADS_KEY, DEFAULT_THREADS);
+            config.setCorePoolSize(threads).setMaxPoolSize(threads)
+                    .setKeepAliveTime(0L, TimeUnit.SECONDS);
+        } else if ("cached".equals(threadpool)) {
+            int threads = getUrl().getPositiveParameter(THREADS_KEY, Integer.MAX_VALUE);
+            config.setCorePoolSize(0).setMaxPoolSize(threads)
+                    .setKeepAliveTime(60L, TimeUnit.SECONDS);
+        } else {
+            throw new IllegalArgumentException("Unsupported threadpool type " + threadpool);
+        }
+        builder.setWorkerThreadPoolConfig(config)
+                .setKeepAlive(true)
+                .setReuseAddress(false)
+                .setIOStrategy(SameThreadIOStrategy.getInstance());
+        transport = builder.build();
+        transport.setProcessor(filterChainBuilder.build());
+        transport.bind(getBindAddress());
+        transport.start();
+    }
+
+    @Override
+    protected void doClose() throws Throwable {
+        try {
+            transport.stop();
+        } catch (Throwable e) {
+            logger.warn(e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public boolean isBound() {
+        return !transport.isStopped();
+    }
+
+    @Override
+    public Collection<Channel> getChannels() {
+        return channels.values();
+    }
+
+    @Override
+    public Channel getChannel(InetSocketAddress remoteAddress) {
+        return channels.get(NetUtils.toAddressString(remoteAddress));
+    }
+
+    @Override
+    public void connected(Channel ch) throws RemotingException {
+        channels.put(NetUtils.toAddressString(ch.getRemoteAddress()), ch);
+        super.connected(ch);
+    }
+
+    @Override
+    public void disconnected(Channel ch) throws RemotingException {
+        channels.remove(NetUtils.toAddressString(ch.getRemoteAddress()));
+        super.disconnected(ch);
+    }
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-grizzly/src/main/java/org/apache/dubbo/remoting/transport/grizzly/GrizzlyTransporter.java b/dubbo-remoting-extensions/dubbo-remoting-grizzly/src/main/java/org/apache/dubbo/remoting/transport/grizzly/GrizzlyTransporter.java
new file mode 100644
index 0000000..e0a4e59
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-grizzly/src/main/java/org/apache/dubbo/remoting/transport/grizzly/GrizzlyTransporter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.transport.grizzly;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.remoting.ChannelHandler;
+import org.apache.dubbo.remoting.Client;
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.RemotingServer;
+import org.apache.dubbo.remoting.Transporter;
+
+/**
+ * GrizzlyTransporter
+ */
+public class GrizzlyTransporter implements Transporter {
+
+    public static final String NAME = "grizzly";
+
+    @Override
+    public RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException {
+        return new GrizzlyServer(url, handler);
+    }
+
+    @Override
+    public Client connect(URL url, ChannelHandler handler) throws RemotingException {
+        return new GrizzlyClient(url, handler);
+    }
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-grizzly/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.Transporter b/dubbo-remoting-extensions/dubbo-remoting-grizzly/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.Transporter
new file mode 100644
index 0000000..d276e79
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-grizzly/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.Transporter
@@ -0,0 +1 @@
+grizzly=org.apache.dubbo.remoting.transport.grizzly.GrizzlyTransporter
diff --git a/dubbo-remoting-extensions/dubbo-remoting-grizzly/src/test/java/org/apache/dubbo/remoting/transport/grizzly/GrizzlyTransporterTest.java b/dubbo-remoting-extensions/dubbo-remoting-grizzly/src/test/java/org/apache/dubbo/remoting/transport/grizzly/GrizzlyTransporterTest.java
new file mode 100644
index 0000000..dbd593e
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-grizzly/src/test/java/org/apache/dubbo/remoting/transport/grizzly/GrizzlyTransporterTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.transport.grizzly;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.remoting.RemotingServer;
+import org.apache.dubbo.remoting.transport.ChannelHandlerAdapter;
+
+import org.junit.jupiter.api.Test;
+
+import static org.apache.dubbo.remoting.Constants.BIND_PORT_KEY;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+public class GrizzlyTransporterTest {
+    @Test
+    public void shouldAbleToBindGrizzly() throws Exception {
+        int port = NetUtils.getAvailablePort();
+        URL url = new URL("telnet", "localhost", port,
+                new String[]{BIND_PORT_KEY, String.valueOf(port)});
+
+        RemotingServer server = new GrizzlyTransporter().bind(url, new ChannelHandlerAdapter());
+
+        assertThat(server.isBound(), is(true));
+    }
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-mina/pom.xml b/dubbo-remoting-extensions/dubbo-remoting-mina/pom.xml
new file mode 100644
index 0000000..b3f1ec1
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-mina/pom.xml
@@ -0,0 +1,51 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+    <parent>
+        <artifactId>dubbo-remoting-extensions</artifactId>
+        <groupId>org.apache.dubbo.extensions</groupId>
+        <version>${revision}</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+
+    <modelVersion>4.0.0</modelVersion>
+    <version>1.0.0-SNAPSHOT</version>
+    <artifactId>dubbo-remoting-mina</artifactId>
+    <packaging>jar</packaging>
+    <name>${project.artifactId}</name>
+    <description>The mina remoting module of dubbo project</description>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.dubbo</groupId>
+            <artifactId>dubbo-remoting-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.mina</groupId>
+            <artifactId>mina-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.dubbo</groupId>
+            <artifactId>dubbo-serialization-hessian2</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/dubbo-remoting-extensions/dubbo-remoting-mina/src/main/java/org/apache/dubbo/remoting/transport/mina/MinaChannel.java b/dubbo-remoting-extensions/dubbo-remoting-mina/src/main/java/org/apache/dubbo/remoting/transport/mina/MinaChannel.java
new file mode 100644
index 0000000..7229cb1
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-mina/src/main/java/org/apache/dubbo/remoting/transport/mina/MinaChannel.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.transport.mina;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.remoting.ChannelHandler;
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.transport.AbstractChannel;
+
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.WriteFuture;
+
+import java.net.InetSocketAddress;
+
+import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT;
+import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
+
+/**
+ * MinaChannel
+ */
+final class MinaChannel extends AbstractChannel {
+
+    private static final Logger logger = LoggerFactory.getLogger(MinaChannel.class);
+
+    private static final String CHANNEL_KEY = MinaChannel.class.getName() + ".CHANNEL";
+
+    private final IoSession session;
+
+    private MinaChannel(IoSession session, URL url, ChannelHandler handler) {
+        super(url, handler);
+        if (session == null) {
+            throw new IllegalArgumentException("mina session == null");
+        }
+        this.session = session;
+    }
+
+    static MinaChannel getOrAddChannel(IoSession session, URL url, ChannelHandler handler) {
+        if (session == null) {
+            return null;
+        }
+        MinaChannel ret = (MinaChannel) session.getAttribute(CHANNEL_KEY);
+        if (ret == null) {
+            ret = new MinaChannel(session, url, handler);
+            if (session.isConnected()) {
+                MinaChannel old = (MinaChannel) session.setAttribute(CHANNEL_KEY, ret);
+                if (old != null) {
+                    session.setAttribute(CHANNEL_KEY, old);
+                    ret = old;
+                }
+            }
+        }
+        return ret;
+    }
+
+    static void removeChannelIfDisconnected(IoSession session) {
+        if (session != null && !session.isConnected()) {
+            session.removeAttribute(CHANNEL_KEY);
+        }
+    }
+
+    @Override
+    public InetSocketAddress getLocalAddress() {
+        return (InetSocketAddress) session.getLocalAddress();
+    }
+
+    @Override
+    public InetSocketAddress getRemoteAddress() {
+        return (InetSocketAddress) session.getRemoteAddress();
+    }
+
+    @Override
+    public boolean isConnected() {
+        return session.isConnected();
+    }
+
+    @Override
+    public void send(Object message, boolean sent) throws RemotingException {
+        super.send(message, sent);
+
+        boolean success = true;
+        int timeout = 0;
+        try {
+            WriteFuture future = session.write(message);
+            if (sent) {
+                timeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
+                success = future.join(timeout);
+            }
+        } catch (Throwable e) {
+            throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
+        }
+
+        if (!success) {
+            throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
+                    + "in timeout(" + timeout + "ms) limit");
+        }
+    }
+
+    @Override
+    public void close() {
+        try {
+            super.close();
+        } catch (Exception e) {
+            logger.warn(e.getMessage(), e);
+        }
+        try {
+            removeChannelIfDisconnected(session);
+        } catch (Exception e) {
+            logger.warn(e.getMessage(), e);
+        }
+        try {
+            if (logger.isInfoEnabled()) {
+                logger.info("CLose mina channel " + session);
+            }
+            session.close();
+        } catch (Exception e) {
+            logger.warn(e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public boolean hasAttribute(String key) {
+        return session.containsAttribute(key);
+    }
+
+    @Override
+    public Object getAttribute(String key) {
+        return session.getAttribute(key);
+    }
+
+    @Override
+    public void setAttribute(String key, Object value) {
+        session.setAttribute(key, value);
+    }
+
+    @Override
+    public void removeAttribute(String key) {
+        session.removeAttribute(key);
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((session == null) ? 0 : session.hashCode());
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (getClass() != obj.getClass()) {
+            return false;
+        }
+        MinaChannel other = (MinaChannel) obj;
+        if (session == null) {
+            if (other.session != null) {
+                return false;
+            }
+        } else if (!session.equals(other.session)) {
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public String toString() {
+        return "MinaChannel [session=" + session + "]";
+    }
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-mina/src/main/java/org/apache/dubbo/remoting/transport/mina/MinaClient.java b/dubbo-remoting-extensions/dubbo-remoting-mina/src/main/java/org/apache/dubbo/remoting/transport/mina/MinaClient.java
new file mode 100644
index 0000000..ad047a0
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-mina/src/main/java/org/apache/dubbo/remoting/transport/mina/MinaClient.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.transport.mina;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.Version;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.NamedThreadFactory;
+import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.remoting.Channel;
+import org.apache.dubbo.remoting.ChannelHandler;
+import org.apache.dubbo.remoting.Constants;
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.transport.AbstractClient;
+
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.IoFuture;
+import org.apache.mina.common.IoFutureListener;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.ThreadModel;
+import org.apache.mina.filter.codec.ProtocolCodecFilter;
+import org.apache.mina.transport.socket.nio.SocketConnector;
+import org.apache.mina.transport.socket.nio.SocketConnectorConfig;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Mina client.
+ */
+public class MinaClient extends AbstractClient {
+
+    private static final Logger logger = LoggerFactory.getLogger(MinaClient.class);
+
+    private static final Map<String, SocketConnector> CONNECTORS = new ConcurrentHashMap<String, SocketConnector>();
+
+    private String connectorKey;
+
+    private SocketConnector connector;
+
+    private volatile IoSession session; // volatile, please copy reference to use
+
+    public MinaClient(final URL url, final ChannelHandler handler) throws RemotingException {
+        super(url, wrapChannelHandler(url, handler));
+    }
+
+    @Override
+    protected void doOpen() throws Throwable {
+        connectorKey = getUrl().toFullString();
+        SocketConnector c = CONNECTORS.get(connectorKey);
+        if (c != null) {
+            connector = c;
+        } else {
+            // set thread pool.
+            connector = new SocketConnector(Constants.DEFAULT_IO_THREADS,
+                    Executors.newCachedThreadPool(new NamedThreadFactory("MinaClientWorker", true)));
+            // config
+            SocketConnectorConfig cfg = (SocketConnectorConfig) connector.getDefaultConfig();
+            cfg.setThreadModel(ThreadModel.MANUAL);
+            cfg.getSessionConfig().setTcpNoDelay(true);
+            cfg.getSessionConfig().setKeepAlive(true);
+            int timeout = getConnectTimeout();
+            cfg.setConnectTimeout(timeout < 1000 ? 1 : timeout / 1000);
+            // set codec.
+            connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new MinaCodecAdapter(getCodec(), getUrl(), this)));
+            CONNECTORS.put(connectorKey, connector);
+        }
+    }
+
+    @Override
+    protected void doConnect() throws Throwable {
+        ConnectFuture future = connector.connect(getConnectAddress(), new MinaHandler(getUrl(), this));
+        long start = System.currentTimeMillis();
+        final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
+        final CountDownLatch finish = new CountDownLatch(1); // resolve future.awaitUninterruptibly() dead lock
+        future.addListener(new IoFutureListener() {
+            @Override
+            public void operationComplete(IoFuture future) {
+                try {
+                    if (future.isReady()) {
+                        IoSession newSession = future.getSession();
+                        try {
+                            // Close old channel
+                            IoSession oldSession = MinaClient.this.session; // copy reference
+                            if (oldSession != null) {
+                                try {
+                                    if (logger.isInfoEnabled()) {
+                                        logger.info("Close old mina channel " + oldSession + " on create new mina channel " + newSession);
+                                    }
+                                    oldSession.close();
+                                } finally {
+                                    MinaChannel.removeChannelIfDisconnected(oldSession);
+                                }
+                            }
+                        } finally {
+                            if (MinaClient.this.isClosed()) {
+                                try {
+                                    if (logger.isInfoEnabled()) {
+                                        logger.info("Close new mina channel " + newSession + ", because the client closed.");
+                                    }
+                                    newSession.close();
+                                } finally {
+                                    MinaClient.this.session = null;
+                                    MinaChannel.removeChannelIfDisconnected(newSession);
+                                }
+                            } else {
+                                MinaClient.this.session = newSession;
+                            }
+                        }
+                    }
+                } catch (Exception e) {
+                    exception.set(e);
+                } finally {
+                    finish.countDown();
+                }
+            }
+        });
+        try {
+            finish.await(getConnectTimeout(), TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+            throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server " + getRemoteAddress() + " client-side timeout "
+                    + getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start)
+                    + "ms) from netty client " + NetUtils.getLocalHost() + " using dubbo version "
+                    + Version.getVersion() + ", cause: " + e.getMessage(), e);
+        }
+        Throwable e = exception.get();
+        if (e != null) {
+            throw e;
+        }
+    }
+
+    @Override
+    protected void doDisConnect() throws Throwable {
+        try {
+            MinaChannel.removeChannelIfDisconnected(session);
+        } catch (Throwable t) {
+            logger.warn(t.getMessage());
+        }
+    }
+
+    @Override
+    protected void doClose() throws Throwable {
+        //release mina resources.
+    }
+
+    @Override
+    protected Channel getChannel() {
+        IoSession s = session;
+        if (s == null || !s.isConnected()) {
+            return null;
+        }
+        return MinaChannel.getOrAddChannel(s, getUrl(), this);
+    }
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-mina/src/main/java/org/apache/dubbo/remoting/transport/mina/MinaCodecAdapter.java b/dubbo-remoting-extensions/dubbo-remoting-mina/src/main/java/org/apache/dubbo/remoting/transport/mina/MinaCodecAdapter.java
new file mode 100644
index 0000000..7ce9253
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-mina/src/main/java/org/apache/dubbo/remoting/transport/mina/MinaCodecAdapter.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.transport.mina;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.remoting.Channel;
+import org.apache.dubbo.remoting.ChannelHandler;
+import org.apache.dubbo.remoting.Codec2;
+import org.apache.dubbo.remoting.buffer.ChannelBuffer;
+import org.apache.dubbo.remoting.buffer.ChannelBuffers;
+import org.apache.dubbo.remoting.buffer.DynamicChannelBuffer;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.filter.codec.ProtocolCodecFactory;
+import org.apache.mina.filter.codec.ProtocolDecoder;
+import org.apache.mina.filter.codec.ProtocolDecoderOutput;
+import org.apache.mina.filter.codec.ProtocolEncoder;
+import org.apache.mina.filter.codec.ProtocolEncoderOutput;
+
+import static org.apache.dubbo.remoting.Constants.BUFFER_KEY;
+import static org.apache.dubbo.remoting.Constants.DEFAULT_BUFFER_SIZE;
+import static org.apache.dubbo.remoting.Constants.MAX_BUFFER_SIZE;
+import static org.apache.dubbo.remoting.Constants.MIN_BUFFER_SIZE;
+
+/**
+ * MinaCodecAdapter.
+ */
+final class MinaCodecAdapter implements ProtocolCodecFactory {
+
+    private final ProtocolEncoder encoder = new InternalEncoder();
+
+    private final ProtocolDecoder decoder = new InternalDecoder();
+
+    private final Codec2 codec;
+
+    private final URL url;
+
+    private final ChannelHandler handler;
+
+    private final int bufferSize;
+
+    public MinaCodecAdapter(Codec2 codec, URL url, ChannelHandler handler) {
+        this.codec = codec;
+        this.url = url;
+        this.handler = handler;
+        int b = url.getPositiveParameter(BUFFER_KEY, DEFAULT_BUFFER_SIZE);
+        this.bufferSize = b >= MIN_BUFFER_SIZE && b <= MAX_BUFFER_SIZE ? b : DEFAULT_BUFFER_SIZE;
+    }
+
+    @Override
+    public ProtocolEncoder getEncoder() {
+        return encoder;
+    }
+
+    @Override
+    public ProtocolDecoder getDecoder() {
+        return decoder;
+    }
+
+    private class InternalEncoder implements ProtocolEncoder {
+
+        @Override
+        public void dispose(IoSession session) throws Exception {
+        }
+
+        @Override
+        public void encode(IoSession session, Object msg, ProtocolEncoderOutput out) throws Exception {
+            ChannelBuffer buffer = ChannelBuffers.dynamicBuffer(1024);
+            MinaChannel channel = MinaChannel.getOrAddChannel(session, url, handler);
+            try {
+                codec.encode(channel, buffer, msg);
+            } finally {
+                MinaChannel.removeChannelIfDisconnected(session);
+            }
+            out.write(ByteBuffer.wrap(buffer.toByteBuffer()));
+            out.flush();
+        }
+    }
+
+    private class InternalDecoder implements ProtocolDecoder {
+
+        private ChannelBuffer buffer = ChannelBuffers.EMPTY_BUFFER;
+
+        @Override
+        public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception {
+            int readable = in.limit();
+            if (readable <= 0) {
+                return;
+            }
+
+            ChannelBuffer frame;
+
+            if (buffer.readable()) {
+                if (buffer instanceof DynamicChannelBuffer) {
+                    buffer.writeBytes(in.buf());
+                    frame = buffer;
+                } else {
+                    int size = buffer.readableBytes() + in.remaining();
+                    frame = ChannelBuffers.dynamicBuffer(size > bufferSize ? size : bufferSize);
+                    frame.writeBytes(buffer, buffer.readableBytes());
+                    frame.writeBytes(in.buf());
+                }
+            } else {
+                frame = ChannelBuffers.wrappedBuffer(in.buf());
+            }
+
+            Channel channel = MinaChannel.getOrAddChannel(session, url, handler);
+            Object msg;
+            int savedReadIndex;
+
+            try {
+                do {
+                    savedReadIndex = frame.readerIndex();
+                    try {
+                        msg = codec.decode(channel, frame);
+                    } catch (Exception e) {
+                        buffer = ChannelBuffers.EMPTY_BUFFER;
+                        throw e;
+                    }
+                    if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
+                        frame.readerIndex(savedReadIndex);
+                        break;
+                    } else {
+                        if (savedReadIndex == frame.readerIndex()) {
+                            buffer = ChannelBuffers.EMPTY_BUFFER;
+                            throw new Exception("Decode without read data.");
+                        }
+                        if (msg != null) {
+                            out.write(msg);
+                        }
+                    }
+                } while (frame.readable());
+            } finally {
+                if (frame.readable()) {
+                    frame.discardReadBytes();
+                    buffer = frame;
+                } else {
+                    buffer = ChannelBuffers.EMPTY_BUFFER;
+                }
+                MinaChannel.removeChannelIfDisconnected(session);
+            }
+        }
+
+        @Override
+        public void dispose(IoSession session) throws Exception {
+        }
+
+        @Override
+        public void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception {
+        }
+    }
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-mina/src/main/java/org/apache/dubbo/remoting/transport/mina/MinaHandler.java b/dubbo-remoting-extensions/dubbo-remoting-mina/src/main/java/org/apache/dubbo/remoting/transport/mina/MinaHandler.java
new file mode 100644
index 0000000..3f6ac5e
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-mina/src/main/java/org/apache/dubbo/remoting/transport/mina/MinaHandler.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.transport.mina;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.remoting.ChannelHandler;
+
+import org.apache.mina.common.IoHandlerAdapter;
+import org.apache.mina.common.IoSession;
+
+/**
+ * MinaHandler
+ */
+public class MinaHandler extends IoHandlerAdapter {
+
+    private final URL url;
+
+    private final ChannelHandler handler;
+
+    public MinaHandler(URL url, ChannelHandler handler) {
+        if (url == null) {
+            throw new IllegalArgumentException("url == null");
+        }
+        if (handler == null) {
+            throw new IllegalArgumentException("handler == null");
+        }
+        this.url = url;
+        this.handler = handler;
+    }
+
+    @Override
+    public void sessionOpened(IoSession session) throws Exception {
+        MinaChannel channel = MinaChannel.getOrAddChannel(session, url, handler);
+        try {
+            handler.connected(channel);
+        } finally {
+            MinaChannel.removeChannelIfDisconnected(session);
+        }
+    }
+
+    @Override
+    public void sessionClosed(IoSession session) throws Exception {
+        MinaChannel channel = MinaChannel.getOrAddChannel(session, url, handler);
+        try {
+            handler.disconnected(channel);
+        } finally {
+            MinaChannel.removeChannelIfDisconnected(session);
+        }
+    }
+
+    @Override
+    public void messageReceived(IoSession session, Object message) throws Exception {
+        MinaChannel channel = MinaChannel.getOrAddChannel(session, url, handler);
+        try {
+            handler.received(channel, message);
+        } finally {
+            MinaChannel.removeChannelIfDisconnected(session);
+        }
+    }
+
+    @Override
+    public void messageSent(IoSession session, Object message) throws Exception {
+        MinaChannel channel = MinaChannel.getOrAddChannel(session, url, handler);
+        try {
+            handler.sent(channel, message);
+        } finally {
+            MinaChannel.removeChannelIfDisconnected(session);
+        }
+    }
+
+    @Override
+    public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
+        MinaChannel channel = MinaChannel.getOrAddChannel(session, url, handler);
+        try {
+            handler.caught(channel, cause);
+        } finally {
+            MinaChannel.removeChannelIfDisconnected(session);
+        }
+    }
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-mina/src/main/java/org/apache/dubbo/remoting/transport/mina/MinaServer.java b/dubbo-remoting-extensions/dubbo-remoting-mina/src/main/java/org/apache/dubbo/remoting/transport/mina/MinaServer.java
new file mode 100644
index 0000000..f88b967
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-mina/src/main/java/org/apache/dubbo/remoting/transport/mina/MinaServer.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.transport.mina;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.ExecutorUtil;
+import org.apache.dubbo.common.utils.NamedThreadFactory;
+import org.apache.dubbo.remoting.Channel;
+import org.apache.dubbo.remoting.ChannelHandler;
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.transport.AbstractServer;
+import org.apache.dubbo.remoting.transport.dispatcher.ChannelHandlers;
+
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.ThreadModel;
+import org.apache.mina.filter.codec.ProtocolCodecFilter;
+import org.apache.mina.transport.socket.nio.SocketAcceptor;
+import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
+
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.Executors;
+
+import static org.apache.dubbo.common.constants.CommonConstants.IO_THREADS_KEY;
+import static org.apache.dubbo.remoting.Constants.DEFAULT_IO_THREADS;
+
+/**
+ * MinaServer
+ */
+public class MinaServer extends AbstractServer {
+
+    private static final Logger logger = LoggerFactory.getLogger(MinaServer.class);
+
+    private SocketAcceptor acceptor;
+
+    public MinaServer(URL url, ChannelHandler handler) throws RemotingException {
+        super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
+    }
+
+    @Override
+    protected void doOpen() throws Throwable {
+        // set thread pool.
+        acceptor = new SocketAcceptor(getUrl().getPositiveParameter(IO_THREADS_KEY, DEFAULT_IO_THREADS),
+                Executors.newCachedThreadPool(new NamedThreadFactory("MinaServerWorker",
+                        true)));
+        // config
+        SocketAcceptorConfig cfg = acceptor.getDefaultConfig();
+        cfg.setThreadModel(ThreadModel.MANUAL);
+        // set codec.
+        acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new MinaCodecAdapter(getCodec(), getUrl(), this)));
+
+        acceptor.bind(getBindAddress(), new MinaHandler(getUrl(), this));
+    }
+
+    @Override
+    protected void doClose() throws Throwable {
+        try {
+            if (acceptor != null) {
+                acceptor.unbind(getBindAddress());
+            }
+        } catch (Throwable e) {
+            logger.warn(e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public Collection<Channel> getChannels() {
+        Set<IoSession> sessions = acceptor.getManagedSessions(getBindAddress());
+        Collection<Channel> channels = new HashSet<Channel>();
+        for (IoSession session : sessions) {
+            if (session.isConnected()) {
+                channels.add(MinaChannel.getOrAddChannel(session, getUrl(), this));
+            }
+        }
+        return channels;
+    }
+
+    @Override
+    public Channel getChannel(InetSocketAddress remoteAddress) {
+        Set<IoSession> sessions = acceptor.getManagedSessions(getBindAddress());
+        for (IoSession session : sessions) {
+            if (session.getRemoteAddress().equals(remoteAddress)) {
+                return MinaChannel.getOrAddChannel(session, getUrl(), this);
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public boolean isBound() {
+        return acceptor.isManaged(getBindAddress());
+    }
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-mina/src/main/java/org/apache/dubbo/remoting/transport/mina/MinaTransporter.java b/dubbo-remoting-extensions/dubbo-remoting-mina/src/main/java/org/apache/dubbo/remoting/transport/mina/MinaTransporter.java
new file mode 100644
index 0000000..ae2b190
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-mina/src/main/java/org/apache/dubbo/remoting/transport/mina/MinaTransporter.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.transport.mina;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.remoting.ChannelHandler;
+import org.apache.dubbo.remoting.Client;
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.RemotingServer;
+import org.apache.dubbo.remoting.Transporter;
+
+public class MinaTransporter implements Transporter {
+
+    public static final String NAME = "mina";
+
+    @Override
+    public RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException {
+        return new MinaServer(url, handler);
+    }
+
+    @Override
+    public Client connect(URL url, ChannelHandler handler) throws RemotingException {
+        return new MinaClient(url, handler);
+    }
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-mina/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.Transporter b/dubbo-remoting-extensions/dubbo-remoting-mina/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.Transporter
new file mode 100644
index 0000000..70b87ac
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-mina/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.Transporter
@@ -0,0 +1 @@
+mina=org.apache.dubbo.remoting.transport.mina.MinaTransporter
diff --git a/dubbo-remoting-extensions/dubbo-remoting-mina/src/test/java/org/apache/remoting/transport/mina/ClientToServerTest.java b/dubbo-remoting-extensions/dubbo-remoting-mina/src/test/java/org/apache/remoting/transport/mina/ClientToServerTest.java
new file mode 100644
index 0000000..d6fd957
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-mina/src/test/java/org/apache/remoting/transport/mina/ClientToServerTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.remoting.transport.mina;
+
+import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.exchange.ExchangeChannel;
+import org.apache.dubbo.remoting.exchange.ExchangeServer;
+import org.apache.dubbo.remoting.exchange.support.Replier;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * ClientToServer
+ */
+public abstract class ClientToServerTest {
+
+    protected static final String LOCALHOST = "127.0.0.1";
+
+    protected ExchangeServer server;
+
+    protected ExchangeChannel client;
+
+    protected WorldHandler handler = new WorldHandler();
+
+    protected abstract ExchangeServer newServer(int port, Replier<?> receiver) throws RemotingException;
+
+    protected abstract ExchangeChannel newClient(int port) throws RemotingException;
+
+    @BeforeEach
+    protected void setUp() throws Exception {
+        int port = NetUtils.getAvailablePort();
+        server = newServer(port, handler);
+        client = newClient(port);
+    }
+
+    @AfterEach
+    protected void tearDown() {
+        try {
+            if (server != null)
+                server.close();
+        } finally {
+            if (client != null)
+                client.close();
+        }
+    }
+
+    @Test
+    public void testFuture() throws Exception {
+        CompletableFuture<Object> future = client.request(new World("world"));
+        Hello result = (Hello) future.get();
+        Assertions.assertEquals("hello,world", result.getName());
+    }
+
+//    @Test
+//    public void testCallback() throws Exception {
+//        final Object waitter = new Object();
+//        client.invoke(new World("world"), new InvokeCallback<Hello>() {
+//            public void callback(Hello result) {
+//                Assertions.assertEquals("hello,world", result.getName());
+//                synchronized (waitter) {
+//                    waitter.notifyAll();
+//                }
+//            }
+//            public void onException(Throwable exception) {
+//            }
+//        });
+//        synchronized (waitter) {
+//            waitter.wait();
+//        }
+//    }
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-mina/src/test/java/org/apache/remoting/transport/mina/ClientsTest.java b/dubbo-remoting-extensions/dubbo-remoting-mina/src/test/java/org/apache/remoting/transport/mina/ClientsTest.java
new file mode 100644
index 0000000..078f92d
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-mina/src/test/java/org/apache/remoting/transport/mina/ClientsTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.remoting.transport.mina;
+
+import org.apache.dubbo.common.extension.ExtensionLoader;
+import org.apache.dubbo.remoting.Transporter;
+import org.apache.dubbo.remoting.transport.mina.MinaTransporter;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class ClientsTest {
+
+    @Test
+    public void testGetTransportEmpty() {
+        try {
+            ExtensionLoader.getExtensionLoader(Transporter.class).getExtension("");
+            fail();
+        } catch (IllegalArgumentException expected) {
+            assertThat(expected.getMessage(), containsString("Extension name == null"));
+        }
+    }
+
+    @Test
+    public void testGetTransportNull() {
+        Assertions.assertThrows(IllegalArgumentException.class, () -> {
+            String name = null;
+            ExtensionLoader.getExtensionLoader(Transporter.class).getExtension(name);
+        });
+    }
+
+    @Test
+    public void testGetTransport1() {
+        String name = "mina";
+        assertEquals(MinaTransporter.class, ExtensionLoader.getExtensionLoader(Transporter.class).getExtension(name).getClass());
+    }
+
+    @Test
+    public void testGetTransportWrong() {
+        Assertions.assertThrows(IllegalStateException.class, () -> {
+            String name = "nety";
+            assertNull(ExtensionLoader.getExtensionLoader(Transporter.class).getExtension(name).getClass());
+        });
+    }
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-mina/src/test/java/org/apache/remoting/transport/mina/Hello.java b/dubbo-remoting-extensions/dubbo-remoting-mina/src/test/java/org/apache/remoting/transport/mina/Hello.java
new file mode 100644
index 0000000..15ae552
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-mina/src/test/java/org/apache/remoting/transport/mina/Hello.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.remoting.transport.mina;
+
+import java.io.Serializable;
+
+/**
+ * Result
+ */
+public class Hello implements Serializable {
+
+    private static final long serialVersionUID = 8563900571013747774L;
+
+    private String name;
+
+    public Hello() {
+    }
+
+    public Hello(String name) {
+        this.name = name;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-mina/src/test/java/org/apache/remoting/transport/mina/MinaClientToServerTest.java b/dubbo-remoting-extensions/dubbo-remoting-mina/src/test/java/org/apache/remoting/transport/mina/MinaClientToServerTest.java
new file mode 100644
index 0000000..d1c0525
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-mina/src/test/java/org/apache/remoting/transport/mina/MinaClientToServerTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.remoting.transport.mina;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.exchange.ExchangeChannel;
+import org.apache.dubbo.remoting.exchange.ExchangeServer;
+import org.apache.dubbo.remoting.exchange.Exchangers;
+import org.apache.dubbo.remoting.exchange.support.Replier;
+
+/**
+ * MinaServerClientTest
+ */
+public class MinaClientToServerTest extends ClientToServerTest {
+
+    @Override
+    protected ExchangeServer newServer(int port, Replier<?> receiver) throws RemotingException {
+        return Exchangers.bind(URL.valueOf("exchange://localhost:" + port + "?server=mina"), receiver);
+    }
+
+    @Override
+    protected ExchangeChannel newClient(int port) throws RemotingException {
+        return Exchangers.connect(URL.valueOf("exchange://localhost:" + port + "?client=mina&timeout=3000"));
+    }
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-mina/src/test/java/org/apache/remoting/transport/mina/World.java b/dubbo-remoting-extensions/dubbo-remoting-mina/src/test/java/org/apache/remoting/transport/mina/World.java
new file mode 100644
index 0000000..1f64d96
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-mina/src/test/java/org/apache/remoting/transport/mina/World.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.remoting.transport.mina;
+
+import java.io.Serializable;
+
+/**
+ * Data
+ */
+public class World implements Serializable {
+
+    private static final long serialVersionUID = 8563900571013747774L;
+
+    private String name;
+
+    public World() {
+    }
+
+    public World(String name) {
+        this.name = name;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-mina/src/test/java/org/apache/remoting/transport/mina/WorldHandler.java b/dubbo-remoting-extensions/dubbo-remoting-mina/src/test/java/org/apache/remoting/transport/mina/WorldHandler.java
new file mode 100644
index 0000000..9a328b6
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-mina/src/test/java/org/apache/remoting/transport/mina/WorldHandler.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.remoting.transport.mina;
+
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.exchange.ExchangeChannel;
+import org.apache.dubbo.remoting.exchange.support.Replier;
+
+/**
+ * DataHandler
+ */
+public class WorldHandler implements Replier<World> {
+
+    public Class<World> interest() {
+        return World.class;
+    }
+
+    public Object reply(ExchangeChannel channel, World msg) throws RemotingException {
+        return new Hello("hello," + msg.getName());
+    }
+
+}
diff --git a/dubbo-remoting-extensions/pom.xml b/dubbo-remoting-extensions/dubbo-remoting-p2p/pom.xml
similarity index 54%
copy from dubbo-remoting-extensions/pom.xml
copy to dubbo-remoting-extensions/dubbo-remoting-p2p/pom.xml
index 6a6b72a..08a49a7 100644
--- a/dubbo-remoting-extensions/pom.xml
+++ b/dubbo-remoting-extensions/dubbo-remoting-p2p/pom.xml
@@ -1,4 +1,3 @@
-<?xml version="1.0" encoding="UTF-8"?>
 <!--
   Licensed to the Apache Software Foundation (ASF) under one or more
   contributor license agreements.  See the NOTICE file distributed with
@@ -15,24 +14,30 @@
   See the License for the specific language governing permissions and
   limitations under the License.
   -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
     <parent>
+        <artifactId>dubbo-remoting-extensions</artifactId>
         <groupId>org.apache.dubbo.extensions</groupId>
-        <artifactId>extensions-parent</artifactId>
         <version>${revision}</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
-    <modelVersion>4.0.0</modelVersion>
-
-    <artifactId>dubbo-remoting-extensions</artifactId>
-    <version>${revision}</version>
-    <packaging>pom</packaging>
-
-    <modules>
-        <module>dubbo-remoting-quic</module>
-    </modules>
 
+    <modelVersion>4.0.0</modelVersion>
+    <version>1.0.0-SNAPSHOT</version>
+    <artifactId>dubbo-remoting-p2p</artifactId>
+    <packaging>jar</packaging>
+    <name>${project.artifactId}</name>
+    <description>The p2p remoting module of dubbo project</description>
 
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.dubbo</groupId>
+            <artifactId>dubbo-remoting-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.dubbo</groupId>
+            <artifactId>dubbo-remoting-netty4</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
 </project>
diff --git a/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/Group.java b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/Group.java
new file mode 100644
index 0000000..1c335e2
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/Group.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.p2p;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.remoting.ChannelHandler;
+import org.apache.dubbo.remoting.RemotingException;
+
+/**
+ * Group. (SPI, Prototype, ThreadSafe)
+ * <p>
+ * <a href="http://en.wikipedia.org/wiki/Peer-to-peer">Peer-to-peer</a>
+ */
+public interface Group {
+
+    /**
+     * get group url.
+     *
+     * @return group url.
+     */
+    URL getUrl();
+
+    /**
+     * join.
+     *
+     * @param url
+     */
+    Peer join(URL url, ChannelHandler handler) throws RemotingException;
+
+    /**
+     * leave.
+     *
+     * @param url
+     * @throws RemotingException
+     */
+    void leave(URL url) throws RemotingException;
+
+    /**
+     * close the group.
+     */
+    void close();
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/Networker.java b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/Networker.java
new file mode 100644
index 0000000..c4801cd
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/Networker.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.p2p;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.extension.SPI;
+import org.apache.dubbo.remoting.RemotingException;
+
+/**
+ * Networker. (SPI, Singleton, ThreadSafe)
+ * <p>
+ * <a href="http://en.wikipedia.org/wiki/Peer-to-peer">Peer-to-peer</a>
+ */
+@SPI
+public interface Networker {
+
+    /**
+     * lookup group.
+     *
+     * @param url group url
+     * @return group.
+     */
+    Group lookup(URL url) throws RemotingException;
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/Networkers.java b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/Networkers.java
new file mode 100644
index 0000000..980c3d4
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/Networkers.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.p2p;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.extension.ExtensionLoader;
+import org.apache.dubbo.remoting.ChannelHandler;
+import org.apache.dubbo.remoting.RemotingException;
+
+/**
+ * Networkers. (API, Static, ThreadSafe)
+ * <p>
+ * <a href="http://en.wikipedia.org/wiki/Peer-to-peer">Peer-to-peer</a>
+ */
+public class Networkers {
+
+    public static Peer join(String group, String peer, ChannelHandler handler) throws RemotingException {
+        return join(URL.valueOf(group), URL.valueOf(peer), handler);
+    }
+
+    public static Peer join(URL group, URL peer, ChannelHandler handler) throws RemotingException {
+        return lookup(group).join(peer, handler);
+    }
+
+    public static Group lookup(String group) throws RemotingException {
+        return lookup(URL.valueOf(group));
+    }
+
+    public static Group lookup(URL group) throws RemotingException {
+        Networker networker = ExtensionLoader.getExtensionLoader(Networker.class).getExtension(group.getProtocol());
+        return networker.lookup(group);
+    }
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/Peer.java b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/Peer.java
new file mode 100644
index 0000000..474f795
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/Peer.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.p2p;
+
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.RemotingServer;
+
+/**
+ * Peer. (SPI, Prototype, ThreadSafe)
+ * <p>
+ * <a href="http://en.wikipedia.org/wiki/Peer-to-peer">Peer-to-peer</a>
+ */
+public interface Peer extends RemotingServer {
+
+    /**
+     * leave.
+     *
+     * @throws RemotingException
+     */
+    void leave() throws RemotingException;
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/exchange/ExchangeGroup.java b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/exchange/ExchangeGroup.java
new file mode 100644
index 0000000..7fe21ef
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/exchange/ExchangeGroup.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.p2p.exchange;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.exchange.ExchangeHandler;
+import org.apache.dubbo.remoting.p2p.Group;
+
+/**
+ * Group
+ */
+public interface ExchangeGroup extends Group {
+
+    /**
+     * join.
+     *
+     * @param url
+     */
+    ExchangePeer join(URL url, ExchangeHandler handler) throws RemotingException;
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/exchange/ExchangeNetworker.java b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/exchange/ExchangeNetworker.java
new file mode 100644
index 0000000..ac77d68
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/exchange/ExchangeNetworker.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.p2p.exchange;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.remoting.RemotingException;
+
+/**
+ * Networker
+ */
+public interface ExchangeNetworker {
+
+    /**
+     * lookup group.
+     *
+     * @param url group url
+     * @return group.
+     */
+    ExchangeGroup lookup(URL url) throws RemotingException;
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/exchange/ExchangeNetworkers.java b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/exchange/ExchangeNetworkers.java
new file mode 100644
index 0000000..0c047b7
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/exchange/ExchangeNetworkers.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.p2p.exchange;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.extension.ExtensionLoader;
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.exchange.ExchangeHandler;
+
+/**
+ * Networkers
+ */
+public class ExchangeNetworkers {
+
+    public static ExchangePeer join(String group, String peer, ExchangeHandler handler) throws RemotingException {
+        return join(URL.valueOf(group), URL.valueOf(peer), handler);
+    }
+
+    public static ExchangePeer join(URL group, URL peer, ExchangeHandler handler) throws RemotingException {
+        return lookup(group).join(peer, handler);
+    }
+
+    public static ExchangeGroup lookup(String group) throws RemotingException {
+        return lookup(URL.valueOf(group));
+    }
+
+    public static ExchangeGroup lookup(URL group) throws RemotingException {
+        ExchangeNetworker networker = ExtensionLoader.getExtensionLoader(ExchangeNetworker.class).getExtension(group.getProtocol());
+        return networker.lookup(group);
+    }
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/exchange/ExchangePeer.java b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/exchange/ExchangePeer.java
new file mode 100644
index 0000000..a9f2419
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/exchange/ExchangePeer.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.p2p.exchange;
+
+import org.apache.dubbo.remoting.exchange.ExchangeServer;
+import org.apache.dubbo.remoting.p2p.Peer;
+
+/**
+ * Peer
+ */
+public interface ExchangePeer extends Peer, ExchangeServer {
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/exchange/support/AbstractExchangeGroup.java b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/exchange/support/AbstractExchangeGroup.java
new file mode 100644
index 0000000..37d81a2
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/exchange/support/AbstractExchangeGroup.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.p2p.exchange.support;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.remoting.ChannelHandler;
+import org.apache.dubbo.remoting.Client;
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.RemotingServer;
+import org.apache.dubbo.remoting.exchange.ExchangeClient;
+import org.apache.dubbo.remoting.exchange.ExchangeHandler;
+import org.apache.dubbo.remoting.exchange.ExchangeServer;
+import org.apache.dubbo.remoting.exchange.Exchangers;
+import org.apache.dubbo.remoting.exchange.support.ExchangeHandlerDispatcher;
+import org.apache.dubbo.remoting.p2p.Peer;
+import org.apache.dubbo.remoting.p2p.exchange.ExchangeGroup;
+import org.apache.dubbo.remoting.p2p.exchange.ExchangePeer;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * AbstractGroup
+ */
+public abstract class AbstractExchangeGroup implements ExchangeGroup {
+
+    // log  output
+    protected static final Logger logger = LoggerFactory.getLogger(AbstractExchangeGroup.class);
+
+    protected final URL url;
+
+    protected final Map<URL, ExchangeServer> servers = new ConcurrentHashMap<URL, ExchangeServer>();
+
+    protected final Map<URL, ExchangeClient> clients = new ConcurrentHashMap<URL, ExchangeClient>();
+
+    protected final ExchangeHandlerDispatcher dispatcher = new ExchangeHandlerDispatcher();
+
+    public AbstractExchangeGroup(URL url) {
+        if (url == null) {
+            throw new IllegalArgumentException("url == null");
+        }
+        this.url = url;
+    }
+
+    @Override
+    public URL getUrl() {
+        return url;
+    }
+
+    @Override
+    public void close() {
+        for (URL url : new ArrayList<URL>(servers.keySet())) {
+            try {
+                leave(url);
+            } catch (Throwable t) {
+                logger.error(t.getMessage(), t);
+            }
+        }
+        for (URL url : new ArrayList<URL>(clients.keySet())) {
+            try {
+                disconnect(url);
+            } catch (Throwable t) {
+                logger.error(t.getMessage(), t);
+            }
+        }
+    }
+
+    @Override
+    public Peer join(URL url, ChannelHandler handler) throws RemotingException {
+        return join(url, (ExchangeHandler) handler);
+    }
+
+    @Override
+    public ExchangePeer join(URL url, ExchangeHandler handler) throws RemotingException {
+        ExchangeServer server = servers.get(url);
+        if (server == null) { // TODO exist concurrent gap
+            server = Exchangers.bind(url, handler);
+            servers.put(url, server);
+            dispatcher.addChannelHandler(handler);
+        }
+        return new ExchangeServerPeer(server, clients, this);
+    }
+
+    @Override
+    public void leave(URL url) throws RemotingException {
+        RemotingServer server = servers.remove(url);
+        if (server != null) {
+            server.close();
+        }
+    }
+
+    protected Client connect(URL url) throws RemotingException {
+        if (servers.containsKey(url)) {
+            return null;
+        }
+        ExchangeClient client = clients.get(url);
+        if (client == null) { // TODO exist concurrent gap
+            client = Exchangers.connect(url, dispatcher);
+            clients.put(url, client);
+        }
+        return client;
+    }
+
+    protected void disconnect(URL url) throws RemotingException {
+        Client client = clients.remove(url);
+        if (client != null) {
+            client.close();
+        }
+    }
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/exchange/support/ExchangeServerPeer.java b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/exchange/support/ExchangeServerPeer.java
new file mode 100644
index 0000000..5f3aebd
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/exchange/support/ExchangeServerPeer.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.p2p.exchange.support;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.remoting.Channel;
+import org.apache.dubbo.remoting.Client;
+import org.apache.dubbo.remoting.Constants;
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.exchange.ExchangeChannel;
+import org.apache.dubbo.remoting.exchange.ExchangeClient;
+import org.apache.dubbo.remoting.exchange.ExchangeServer;
+import org.apache.dubbo.remoting.exchange.support.ExchangeServerDelegate;
+import org.apache.dubbo.remoting.p2p.exchange.ExchangeGroup;
+import org.apache.dubbo.remoting.p2p.exchange.ExchangePeer;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * ServerPeer
+ */
+public class ExchangeServerPeer extends ExchangeServerDelegate implements ExchangePeer {
+
+    private static final Logger logger = LoggerFactory.getLogger(ExchangeServerPeer.class);
+
+    private final Map<URL, ExchangeClient> clients;
+
+    private final ExchangeGroup group;
+
+    public ExchangeServerPeer(ExchangeServer server, Map<URL, ExchangeClient> clients, ExchangeGroup group) {
+        super(server);
+        this.clients = clients;
+        this.group = group;
+    }
+
+    @Override
+    public void leave() throws RemotingException {
+        group.leave(getUrl());
+    }
+
+    @Override
+    public void close() {
+        try {
+            leave();
+        } catch (RemotingException e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    @Override
+    public Collection<Channel> getChannels() {
+        return (Collection) getExchangeChannels();
+    }
+
+    @Override
+    public Channel getChannel(InetSocketAddress remoteAddress) {
+        return getExchangeChannel(remoteAddress);
+    }
+
+    @Override
+    public Collection<ExchangeChannel> getExchangeChannels() {
+        Collection<ExchangeChannel> channels = super.getExchangeChannels();
+        if (clients.size() > 0) {
+            channels = channels == null ? new ArrayList<ExchangeChannel>() : new ArrayList<ExchangeChannel>(channels);
+            channels.addAll(clients.values());
+        }
+        return channels;
+    }
+
+    @Override
+    public ExchangeChannel getExchangeChannel(InetSocketAddress remoteAddress) {
+        String host = remoteAddress.getAddress() != null ? remoteAddress.getAddress().getHostAddress() : remoteAddress.getHostName();
+        int port = remoteAddress.getPort();
+        ExchangeChannel channel = super.getExchangeChannel(remoteAddress);
+        if (channel == null) {
+            for (Map.Entry<URL, ExchangeClient> entry : clients.entrySet()) {
+                URL url = entry.getKey();
+                if (url.getIp().equals(host) && url.getPort() == port) {
+                    return entry.getValue();
+                }
+            }
+        }
+        return channel;
+    }
+
+    @Override
+    public void send(Object message) throws RemotingException {
+        send(message, getUrl().getParameter(Constants.SENT_KEY, false));
+    }
+
+    @Override
+    public void send(Object message, boolean sent) throws RemotingException {
+        Throwable last = null;
+        try {
+            super.send(message, sent);
+        } catch (Throwable t) {
+            last = t;
+        }
+        for (Client client : clients.values()) {
+            try {
+                client.send(message, sent);
+            } catch (Throwable t) {
+                last = t;
+            }
+        }
+        if (last != null) {
+            if (last instanceof RemotingException) {
+                throw (RemotingException) last;
+            } else if (last instanceof RuntimeException) {
+                throw (RuntimeException) last;
+            } else {
+                throw new RuntimeException(last.getMessage(), last);
+            }
+        }
+    }
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/exchange/support/FileExchangeGroup.java b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/exchange/support/FileExchangeGroup.java
new file mode 100644
index 0000000..2ea192e
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/exchange/support/FileExchangeGroup.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.p2p.exchange.support;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.utils.ExecutorUtil;
+import org.apache.dubbo.common.utils.IOUtils;
+import org.apache.dubbo.common.utils.NamedThreadFactory;
+import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.exchange.ExchangeHandler;
+import org.apache.dubbo.remoting.p2p.exchange.ExchangePeer;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * FileGroup
+ */
+public class FileExchangeGroup extends AbstractExchangeGroup {
+
+    private final File file;
+    // scheduled executor service
+    private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3, new NamedThreadFactory("FileGroupModifiedChecker", true));
+    // Reconnect the timer to check whether the connection is available at a time, and when unavailable, an infinite reconnection
+    private final ScheduledFuture<?> checkModifiedFuture;
+    private volatile long last;
+
+    public FileExchangeGroup(URL url) {
+        super(url);
+        String path = url.getHost() + "/" + url.getPath();
+        file = new File(path);
+        if (!file.exists()) {
+            throw new IllegalStateException("The group file not exists. file: " + path);
+        }
+        checkModifiedFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
+            @Override
+            public void run() {
+                // check the file change
+                try {
+                    check();
+                } catch (Throwable t) { // Defensive fault tolerance
+                    logger.error("Unexpected error occur at reconnect, cause: " + t.getMessage(), t);
+                }
+            }
+        }, 2000, 2000, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void close() {
+        super.close();
+        try {
+            ExecutorUtil.cancelScheduledFuture(checkModifiedFuture);
+        } catch (Throwable t) {
+            logger.error(t.getMessage(), t);
+        }
+    }
+
+    private void check() throws RemotingException {
+        long modified = file.lastModified();
+        if (modified > last) {
+            last = modified;
+            changed();
+        }
+    }
+
+    private void changed() throws RemotingException {
+        try {
+            String[] lines = IOUtils.readLines(file);
+            for (String line : lines) {
+                connect(URL.valueOf(line));
+            }
+        } catch (IOException e) {
+            throw new RemotingException(new InetSocketAddress(NetUtils.getLocalHost(), 0), getUrl().toInetSocketAddress(), e.getMessage(), e);
+        }
+    }
+
+    public ExchangePeer joinExchange(URL url, ExchangeHandler handler) throws RemotingException {
+        ExchangePeer peer = super.join(url, handler);
+        try {
+            String full = url.toFullString();
+            String[] lines = IOUtils.readLines(file);
+            for (String line : lines) {
+                if (full.equals(line)) {
+                    return peer;
+                }
+            }
+            IOUtils.appendLines(file, new String[]{full});
+        } catch (IOException e) {
+            throw new RemotingException(new InetSocketAddress(NetUtils.getLocalHost(), 0), getUrl().toInetSocketAddress(), e.getMessage(), e);
+        }
+        return peer;
+    }
+
+    @Override
+    public void leave(URL url) throws RemotingException {
+        super.leave(url);
+        try {
+            String full = url.toFullString();
+            String[] lines = IOUtils.readLines(file);
+            List<String> saves = new ArrayList<String>();
+            for (String line : lines) {
+                if (full.equals(line)) {
+                    return;
+                }
+                saves.add(line);
+            }
+            IOUtils.appendLines(file, saves.toArray(new String[0]));
+        } catch (IOException e) {
+            throw new RemotingException(new InetSocketAddress(NetUtils.getLocalHost(), 0), getUrl().toInetSocketAddress(), e.getMessage(), e);
+        }
+    }
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/exchange/support/FileExchangeNetworker.java b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/exchange/support/FileExchangeNetworker.java
new file mode 100644
index 0000000..d6d90c7
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/exchange/support/FileExchangeNetworker.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.p2p.exchange.support;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.p2p.exchange.ExchangeGroup;
+import org.apache.dubbo.remoting.p2p.exchange.ExchangeNetworker;
+
+/**
+ * FileNetworker
+ */
+public class FileExchangeNetworker implements ExchangeNetworker {
+
+    @Override
+    public ExchangeGroup lookup(URL url) throws RemotingException {
+        return new FileExchangeGroup(url);
+    }
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/exchange/support/MulticastExchangeGroup.java b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/exchange/support/MulticastExchangeGroup.java
new file mode 100644
index 0000000..f21f873
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/exchange/support/MulticastExchangeGroup.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.p2p.exchange.support;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.exchange.ExchangeHandler;
+import org.apache.dubbo.remoting.p2p.exchange.ExchangePeer;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.MulticastSocket;
+
+/**
+ * MulticastGroup
+ */
+public class MulticastExchangeGroup extends AbstractExchangeGroup {
+
+    private static final String JOIN = "join";
+
+    private static final String LEAVE = "leave";
+
+    private InetAddress multicastAddress;
+
+    private MulticastSocket multicastSocket;
+
+    public MulticastExchangeGroup(URL url) {
+        super(url);
+        if (!NetUtils.isMulticastAddress(url.getHost())) {
+            throw new IllegalArgumentException("Invalid multicast address " + url.getHost() + ", scope: 224.0.0.0 - 239.255.255.255");
+        }
+        try {
+            multicastAddress = InetAddress.getByName(url.getHost());
+            multicastSocket = new MulticastSocket(url.getPort());
+            multicastSocket.setLoopbackMode(false);
+            multicastSocket.joinGroup(multicastAddress);
+            Thread thread = new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    byte[] buf = new byte[1024];
+                    DatagramPacket recv = new DatagramPacket(buf, buf.length);
+                    while (true) {
+                        try {
+                            multicastSocket.receive(recv);
+                            MulticastExchangeGroup.this.receive(new String(recv.getData()).trim(), (InetSocketAddress) recv.getSocketAddress());
+                        } catch (Exception e) {
+                            logger.error(e.getMessage(), e);
+                        }
+                    }
+                }
+            }, "MulticastGroupReceiver");
+            thread.setDaemon(true);
+            thread.start();
+        } catch (IOException e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+
+    private void send(String msg) throws RemotingException {
+        DatagramPacket hi = new DatagramPacket(msg.getBytes(), msg.length(), multicastAddress, multicastSocket.getLocalPort());
+        try {
+            multicastSocket.send(hi);
+        } catch (IOException e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+
+    private void receive(String msg, InetSocketAddress remoteAddress) throws RemotingException {
+        if (msg.startsWith(JOIN)) {
+            String url = msg.substring(JOIN.length()).trim();
+            connect(URL.valueOf(url));
+        } else if (msg.startsWith(LEAVE)) {
+            String url = msg.substring(LEAVE.length()).trim();
+            disconnect(URL.valueOf(url));
+        }
+    }
+
+    @Override
+    public ExchangePeer join(URL url, ExchangeHandler handler) throws RemotingException {
+        ExchangePeer peer = super.join(url, handler);
+        send(JOIN + " " + url.toFullString());
+        return peer;
+    }
+
+    @Override
+    public void leave(URL url) throws RemotingException {
+        super.leave(url);
+        send(LEAVE + " " + url.toFullString());
+    }
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/exchange/support/MulticastExchangeNetworker.java b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/exchange/support/MulticastExchangeNetworker.java
new file mode 100644
index 0000000..18d6186
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/exchange/support/MulticastExchangeNetworker.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.p2p.exchange.support;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.p2p.exchange.ExchangeGroup;
+import org.apache.dubbo.remoting.p2p.exchange.ExchangeNetworker;
+
+/**
+ * MulticastNetworker
+ */
+public class MulticastExchangeNetworker implements ExchangeNetworker {
+
+    @Override
+    public ExchangeGroup lookup(URL url) throws RemotingException {
+        return new MulticastExchangeGroup(url);
+    }
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/support/AbstractGroup.java b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/support/AbstractGroup.java
new file mode 100644
index 0000000..0eec957
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/support/AbstractGroup.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.p2p.support;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.remoting.ChannelHandler;
+import org.apache.dubbo.remoting.Client;
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.RemotingServer;
+import org.apache.dubbo.remoting.Transporters;
+import org.apache.dubbo.remoting.p2p.Group;
+import org.apache.dubbo.remoting.p2p.Peer;
+import org.apache.dubbo.remoting.transport.ChannelHandlerDispatcher;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * AbstractGroup
+ */
+public abstract class AbstractGroup implements Group {
+
+    // log output
+    protected static final Logger logger = LoggerFactory.getLogger(AbstractGroup.class);
+
+    protected final URL url;
+
+    protected final Map<URL, RemotingServer> servers = new ConcurrentHashMap<URL, RemotingServer>();
+
+    protected final Map<URL, Client> clients = new ConcurrentHashMap<URL, Client>();
+
+    protected final ChannelHandlerDispatcher dispatcher = new ChannelHandlerDispatcher();
+
+    public AbstractGroup(URL url) {
+        if (url == null) {
+            throw new IllegalArgumentException("url == null");
+        }
+        this.url = url;
+    }
+
+    @Override
+    public URL getUrl() {
+        return url;
+    }
+
+    @Override
+    public void close() {
+        for (URL url : new ArrayList<URL>(servers.keySet())) {
+            try {
+                leave(url);
+            } catch (Throwable t) {
+                logger.error(t.getMessage(), t);
+            }
+        }
+        for (URL url : new ArrayList<URL>(clients.keySet())) {
+            try {
+                disconnect(url);
+            } catch (Throwable t) {
+                logger.error(t.getMessage(), t);
+            }
+        }
+    }
+
+    @Override
+    public Peer join(URL url, ChannelHandler handler) throws RemotingException {
+        RemotingServer server = servers.get(url);
+        if (server == null) { // TODO exist concurrent gap
+            server = Transporters.bind(url, handler);
+            servers.put(url, server);
+            dispatcher.addChannelHandler(handler);
+        }
+        return new ServerPeer(server, clients, this);
+    }
+
+    @Override
+    public void leave(URL url) throws RemotingException {
+        RemotingServer server = servers.remove(url);
+        if (server != null) {
+            server.close();
+        }
+    }
+
+    protected Client connect(URL url) throws RemotingException {
+        if (servers.containsKey(url)) {
+            return null;
+        }
+        Client client = clients.get(url);
+        if (client == null) { // TODO exist concurrent gap
+            client = Transporters.connect(url, dispatcher);
+            clients.put(url, client);
+        }
+        return client;
+    }
+
+    protected void disconnect(URL url) throws RemotingException {
+        Client client = clients.remove(url);
+        if (client != null) {
+            client.close();
+        }
+    }
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/support/FileGroup.java b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/support/FileGroup.java
new file mode 100644
index 0000000..aa69c08
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/support/FileGroup.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.p2p.support;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.utils.ExecutorUtil;
+import org.apache.dubbo.common.utils.IOUtils;
+import org.apache.dubbo.common.utils.NamedThreadFactory;
+import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.remoting.ChannelHandler;
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.p2p.Peer;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * FileGroup
+ */
+public class FileGroup extends AbstractGroup {
+
+    private final File file;
+    // Scheduled executor service
+    private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3, new NamedThreadFactory("FileGroupModifiedChecker", true));
+    // Reconnect the timer to check whether the connection is available at a time, and when unavailable, an infinite reconnection
+    private final ScheduledFuture<?> checkModifiedFuture;
+    private volatile long last;
+
+    public FileGroup(URL url) {
+        super(url);
+        String path = url.getAbsolutePath();
+        file = new File(path);
+        checkModifiedFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
+            @Override
+            public void run() {
+                // Check the file change
+                try {
+                    check();
+                } catch (Throwable t) { // Defensive fault tolerance
+                    logger.error("Unexpected error occur at reconnect, cause: " + t.getMessage(), t);
+                }
+            }
+        }, 2000, 2000, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void close() {
+        super.close();
+        try {
+            ExecutorUtil.cancelScheduledFuture(checkModifiedFuture);
+        } catch (Throwable t) {
+            logger.error(t.getMessage(), t);
+        }
+    }
+
+    private void check() throws RemotingException {
+        long modified = file.lastModified();
+        if (modified > last) {
+            last = modified;
+            changed();
+        }
+    }
+
+    private void changed() throws RemotingException {
+        try {
+            String[] lines = IOUtils.readLines(file);
+            for (String line : lines) {
+                connect(URL.valueOf(line));
+            }
+        } catch (IOException e) {
+            throw new RemotingException(new InetSocketAddress(NetUtils.getLocalHost(), 0), getUrl().toInetSocketAddress(), e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public Peer join(URL url, ChannelHandler handler) throws RemotingException {
+        Peer peer = super.join(url, handler);
+        try {
+            String full = url.toFullString();
+            String[] lines = IOUtils.readLines(file);
+            for (String line : lines) {
+                if (full.equals(line)) {
+                    return peer;
+                }
+            }
+            IOUtils.appendLines(file, new String[]{full});
+        } catch (IOException e) {
+            throw new RemotingException(new InetSocketAddress(NetUtils.getLocalHost(), 0), getUrl().toInetSocketAddress(), e.getMessage(), e);
+        }
+        return peer;
+    }
+
+    @Override
+    public void leave(URL url) throws RemotingException {
+        super.leave(url);
+        try {
+            String full = url.toFullString();
+            String[] lines = IOUtils.readLines(file);
+            List<String> saves = new ArrayList<String>();
+            for (String line : lines) {
+                if (full.equals(line)) {
+                    return;
+                }
+                saves.add(line);
+            }
+            IOUtils.appendLines(file, saves.toArray(new String[0]));
+        } catch (IOException e) {
+            throw new RemotingException(new InetSocketAddress(NetUtils.getLocalHost(), 0), getUrl().toInetSocketAddress(), e.getMessage(), e);
+        }
+    }
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/support/FileNetworker.java b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/support/FileNetworker.java
new file mode 100644
index 0000000..e9fd1b0
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/support/FileNetworker.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.p2p.support;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.p2p.Group;
+import org.apache.dubbo.remoting.p2p.Networker;
+
+/**
+ * FileNetworker
+ */
+public class FileNetworker implements Networker {
+
+    @Override
+    public Group lookup(URL url) throws RemotingException {
+        return new FileGroup(url);
+    }
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/support/MulticastGroup.java b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/support/MulticastGroup.java
new file mode 100644
index 0000000..8af1722
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/support/MulticastGroup.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.p2p.support;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.remoting.ChannelHandler;
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.p2p.Peer;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.MulticastSocket;
+
+/**
+ * MulticastGroup
+ */
+public class MulticastGroup extends AbstractGroup {
+
+    private static final String JOIN = "join";
+
+    private static final String LEAVE = "leave";
+
+    private InetAddress multicastAddress;
+
+    private MulticastSocket multicastSocket;
+
+    public MulticastGroup(URL url) {
+        super(url);
+        if (!NetUtils.isMulticastAddress(url.getHost())) {
+            throw new IllegalArgumentException("Invalid multicast address " + url.getHost() + ", scope: 224.0.0.0 - 239.255.255.255");
+        }
+        try {
+            multicastAddress = InetAddress.getByName(url.getHost());
+            multicastSocket = new MulticastSocket(url.getPort());
+            multicastSocket.setLoopbackMode(false);
+            multicastSocket.joinGroup(multicastAddress);
+            Thread thread = new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    byte[] buf = new byte[1024];
+                    DatagramPacket recv = new DatagramPacket(buf, buf.length);
+                    while (true) {
+                        try {
+                            multicastSocket.receive(recv);
+                            MulticastGroup.this.receive(new String(recv.getData()).trim(), (InetSocketAddress) recv.getSocketAddress());
+                        } catch (Exception e) {
+                            logger.error(e.getMessage(), e);
+                        }
+                    }
+                }
+            }, "MulticastGroupReceiver");
+            thread.setDaemon(true);
+            thread.start();
+        } catch (IOException e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+
+    private void send(String msg) throws RemotingException {
+        DatagramPacket hi = new DatagramPacket(msg.getBytes(), msg.length(), multicastAddress, multicastSocket.getLocalPort());
+        try {
+            multicastSocket.send(hi);
+        } catch (IOException e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+
+    private void receive(String msg, InetSocketAddress remoteAddress) throws RemotingException {
+        if (msg.startsWith(JOIN)) {
+            String url = msg.substring(JOIN.length()).trim();
+            connect(URL.valueOf(url));
+        } else if (msg.startsWith(LEAVE)) {
+            String url = msg.substring(LEAVE.length()).trim();
+            disconnect(URL.valueOf(url));
+        }
+    }
+
+    @Override
+    public Peer join(URL url, ChannelHandler handler) throws RemotingException {
+        Peer peer = super.join(url, handler);
+        send(JOIN + " " + url.toFullString());
+        return peer;
+    }
+
+    @Override
+    public void leave(URL url) throws RemotingException {
+        super.leave(url);
+        send(LEAVE + " " + url.toFullString());
+    }
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/support/MulticastNetworker.java b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/support/MulticastNetworker.java
new file mode 100644
index 0000000..c706591
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/support/MulticastNetworker.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.p2p.support;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.p2p.Group;
+import org.apache.dubbo.remoting.p2p.Networker;
+
+/**
+ * MulticastNetworker
+ */
+public class MulticastNetworker implements Networker {
+
+    @Override
+    public Group lookup(URL url) throws RemotingException {
+        return new MulticastGroup(url);
+    }
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/support/ServerPeer.java b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/support/ServerPeer.java
new file mode 100644
index 0000000..d5a2575
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/support/ServerPeer.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.p2p.support;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.remoting.Channel;
+import org.apache.dubbo.remoting.Client;
+import org.apache.dubbo.remoting.Constants;
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.RemotingServer;
+import org.apache.dubbo.remoting.p2p.Group;
+import org.apache.dubbo.remoting.p2p.Peer;
+import org.apache.dubbo.remoting.transport.ServerDelegate;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * ServerPeer
+ */
+public class ServerPeer extends ServerDelegate implements Peer {
+
+    private static final Logger logger = LoggerFactory.getLogger(ServerPeer.class);
+
+    private final Map<URL, Client> clients;
+
+    private final Group group;
+
+    public ServerPeer(RemotingServer server, Map<URL, Client> clients, Group group) {
+        super(server);
+        this.clients = clients;
+        this.group = group;
+    }
+
+    @Override
+    public void leave() throws RemotingException {
+        group.leave(getUrl());
+    }
+
+    @Override
+    public void close() {
+        try {
+            leave();
+        } catch (RemotingException e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public Collection<Channel> getChannels() {
+        Collection<Channel> channels = super.getChannels();
+        if (clients.size() > 0) {
+            channels = channels == null ? new ArrayList<Channel>() : new ArrayList<Channel>(channels);
+            channels.addAll(clients.values());
+        }
+        return channels;
+    }
+
+    @Override
+    public Channel getChannel(InetSocketAddress remoteAddress) {
+        String host = remoteAddress.getAddress() != null ? remoteAddress.getAddress().getHostAddress() : remoteAddress.getHostName();
+        int port = remoteAddress.getPort();
+        Channel channel = super.getChannel(remoteAddress);
+        if (channel == null) {
+            for (Map.Entry<URL, Client> entry : clients.entrySet()) {
+                URL url = entry.getKey();
+                if (url.getIp().equals(host) && url.getPort() == port) {
+                    return entry.getValue();
+                }
+            }
+        }
+        return channel;
+    }
+
+    @Override
+    public void send(Object message) throws RemotingException {
+        send(message, getUrl().getParameter(Constants.SENT_KEY, false));
+    }
+
+    @Override
+    public void send(Object message, boolean sent) throws RemotingException {
+        Throwable last = null;
+        try {
+            super.send(message, sent);
+        } catch (Throwable t) {
+            last = t;
+        }
+        for (Client client : clients.values()) {
+            try {
+                client.send(message, sent);
+            } catch (Throwable t) {
+                last = t;
+            }
+        }
+        if (last != null) {
+            if (last instanceof RemotingException) {
+                throw (RemotingException) last;
+            } else if (last instanceof RuntimeException) {
+                throw (RuntimeException) last;
+            } else {
+                throw new RuntimeException(last.getMessage(), last);
+            }
+        }
+    }
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.p2p.Networker b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.p2p.Networker
new file mode 100644
index 0000000..0712e65
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.p2p.Networker
@@ -0,0 +1,2 @@
+multicast=org.apache.dubbo.remoting.p2p.support.MulticastNetworker
+file=org.apache.dubbo.remoting.p2p.support.FileNetworker
diff --git a/dubbo-remoting-extensions/dubbo-remoting-p2p/src/test/java/org/apache/dubbo/remoting/p2p/exchange/support/MulticastExchangeNetworkerTest.java b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/test/java/org/apache/dubbo/remoting/p2p/exchange/support/MulticastExchangeNetworkerTest.java
new file mode 100644
index 0000000..573f245
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/test/java/org/apache/dubbo/remoting/p2p/exchange/support/MulticastExchangeNetworkerTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.p2p.exchange.support;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.remoting.Channel;
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.exchange.ExchangeChannel;
+import org.apache.dubbo.remoting.exchange.ExchangeHandler;
+import org.apache.dubbo.remoting.exchange.support.ExchangeHandlerAdapter;
+import org.apache.dubbo.remoting.p2p.Group;
+import org.apache.dubbo.remoting.p2p.Networkers;
+import org.apache.dubbo.remoting.p2p.Peer;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Mockito.mock;
+
+public class MulticastExchangeNetworkerTest {
+    @Test
+    public void testJoin() throws RemotingException, InterruptedException {
+        final String groupURL = "multicast://224.5.6.7:1234";
+
+        MulticastExchangeNetworker multicastExchangeNetworker = new MulticastExchangeNetworker();
+
+        final CountDownLatch countDownLatch = new CountDownLatch(1);
+        Peer peer1 = multicastExchangeNetworker.lookup(URL.valueOf(groupURL))
+                .join(URL.valueOf("exchange://0.0.0.0:" + NetUtils.getAvailablePort() + "?exchanger=header"), new ExchangeHandlerAdapter() {
+                    @Override
+                    public CompletableFuture<Object> reply(ExchangeChannel channel, Object msg) throws RemotingException {
+                        countDownLatch.countDown();
+                        return super.reply(channel, msg);
+                    }
+                });
+        Peer peer2 = multicastExchangeNetworker.lookup(URL.valueOf(groupURL))
+                .join(URL.valueOf("exchange://0.0.0.0:" + NetUtils.getAvailablePort() + "?exchanger=header"), mock(ExchangeHandler.class));
+
+        while (true) {
+            for (Channel channel : peer1.getChannels()) {
+                channel.send("hello multicast exchange network!");
+            }
+            TimeUnit.MILLISECONDS.sleep(50);
+
+            long count = countDownLatch.getCount();
+            if (count > 0) {
+                break;
+            }
+        }
+
+        Group lookup = Networkers.lookup(groupURL);
+        assertThat(lookup, not(nullValue()));
+
+        assertThat(peer1, instanceOf(ExchangeServerPeer.class));
+
+        peer1.close();
+        peer2.close();
+    }
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-p2p/src/test/java/org/apache/dubbo/remoting/p2p/support/FileNetworkerTest.java b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/test/java/org/apache/dubbo/remoting/p2p/support/FileNetworkerTest.java
new file mode 100644
index 0000000..d947761
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/test/java/org/apache/dubbo/remoting/p2p/support/FileNetworkerTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.p2p.support;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.remoting.Channel;
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.p2p.Group;
+import org.apache.dubbo.remoting.p2p.Peer;
+import org.apache.dubbo.remoting.transport.ChannelHandlerAdapter;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.nio.file.Path;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Mockito.mock;
+
+public class FileNetworkerTest {
+
+    @BeforeEach
+    public void setUp(@TempDir Path folder) throws Exception {
+        folder.toFile().createNewFile();
+    }
+
+    @AfterEach
+    public void tearDown(@TempDir Path folder) {
+        folder.getFileName().toAbsolutePath().toFile().delete();
+    }
+
+    @Test
+    public void testJoin(@TempDir Path folder) throws RemotingException, InterruptedException {
+        final String groupURL = "file:///" + folder.getFileName().toAbsolutePath();
+
+        FileNetworker networker = new FileNetworker();
+        Group group = networker.lookup(URL.valueOf(groupURL));
+
+        final CountDownLatch countDownLatch = new CountDownLatch(1);
+        Peer peer1 = group.join(URL.valueOf("exchange://0.0.0.0:" + NetUtils.getAvailablePort() + "?exchanger=header"), new ChannelHandlerAdapter() {
+            @Override
+            public void received(Channel channel, Object message) {
+                countDownLatch.countDown();
+            }
+        });
+        Peer peer2 = group.join(URL.valueOf("exchange://0.0.0.0:" + NetUtils.getAvailablePort() + "?exchanger=header"),
+                mock(ChannelHandlerAdapter.class));
+
+        while (true) {
+            long count = countDownLatch.getCount();
+            if (count > 0) {
+                break;
+            }
+            for (Channel channel : peer1.getChannels()) {
+                channel.send(0, false);
+                channel.send("hello world!");
+            }
+            TimeUnit.MILLISECONDS.sleep(50);
+        }
+
+
+        peer2.close();
+        peer1.close();
+    }
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-p2p/src/test/java/org/apache/dubbo/remoting/p2p/support/MulticastNetworkerTest.java b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/test/java/org/apache/dubbo/remoting/p2p/support/MulticastNetworkerTest.java
new file mode 100644
index 0000000..88d7e97
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/test/java/org/apache/dubbo/remoting/p2p/support/MulticastNetworkerTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.p2p.support;
+
+import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.remoting.Channel;
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.p2p.Group;
+import org.apache.dubbo.remoting.p2p.Networkers;
+import org.apache.dubbo.remoting.p2p.Peer;
+import org.apache.dubbo.remoting.transport.ChannelHandlerAdapter;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Mockito.mock;
+
+public class MulticastNetworkerTest {
+
+    @Test
+    public void testJoin() throws RemotingException, InterruptedException {
+        final String groupURL = "multicast://224.5.6.7:1234";
+        final String peerURL = "exchange://0.0.0.0:" + NetUtils.getAvailablePort() + "?exchanger=header";
+
+        final CountDownLatch countDownLatch = new CountDownLatch(1);
+        Peer peer1 = Networkers.join(groupURL, peerURL, new ChannelHandlerAdapter() {
+            @Override
+            public void received(Channel channel, Object message) {
+                countDownLatch.countDown();
+            }
+        });
+        Peer peer2 = Networkers.join(groupURL, "exchange://0.0.0.0:" + NetUtils.getAvailablePort() + "?exchanger=header",
+                mock(ChannelHandlerAdapter.class));
+
+        while (true) {
+            long count = countDownLatch.getCount();
+            if (count > 0) {
+                break;
+            }
+            for (Channel channel : peer1.getChannels()) {
+                channel.send("hello world!");
+            }
+            TimeUnit.MILLISECONDS.sleep(50);
+        }
+
+        Group lookup = Networkers.lookup(groupURL);
+        assertThat(lookup, not(nullValue()));
+
+        peer2.close();
+        peer1.close();
+    }
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-redis/pom.xml b/dubbo-remoting-extensions/dubbo-remoting-redis/pom.xml
new file mode 100644
index 0000000..e357bd1
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-redis/pom.xml
@@ -0,0 +1,53 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>dubbo-remoting-extensions</artifactId>
+        <groupId>org.apache.dubbo.extensions</groupId>
+        <version>${revision}</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+
+    <modelVersion>4.0.0</modelVersion>
+    <version>1.0.0-SNAPSHOT</version>
+    <artifactId>dubbo-remoting-redis</artifactId>
+
+    <packaging>jar</packaging>
+
+    <name>${project.artifactId}</name>
+    <description>The redis remoting module of dubbo project</description>
+
+    <properties>
+        <skip_maven_deploy>false</skip_maven_deploy>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.dubbo</groupId>
+            <artifactId>dubbo-remoting-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.dubbo</groupId>
+            <artifactId>dubbo-common</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>redis.clients</groupId>
+            <artifactId>jedis</artifactId>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/dubbo-remoting-extensions/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/RedisClient.java b/dubbo-remoting-extensions/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/RedisClient.java
new file mode 100644
index 0000000..7b2d7b8
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/RedisClient.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.redis;
+
+import redis.clients.jedis.JedisPubSub;
+
+import java.util.Map;
+import java.util.Set;
+
+public interface RedisClient {
+    Long hset(String key, String field, String value);
+
+    Long publish(String channel, String message);
+
+//    void clean(String pattern);
+
+    boolean isConnected();
+
+    void destroy();
+
+    Long hdel(final String key, final String... fields);
+
+    Set<String> scan(String pattern);
+
+    Map<String, String> hgetAll(String key);
+
+    void psubscribe(final JedisPubSub jedisPubSub, final String... patterns);
+
+    void disconnect();
+
+    void close();
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/jedis/ClusterRedisClient.java b/dubbo-remoting-extensions/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/jedis/ClusterRedisClient.java
new file mode 100644
index 0000000..e0e4c14
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/jedis/ClusterRedisClient.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.redis.jedis;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.StringUtils;
+import org.apache.dubbo.remoting.redis.RedisClient;
+import org.apache.dubbo.remoting.redis.support.AbstractRedisClient;
+
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisPubSub;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import static org.apache.dubbo.common.constants.CommonConstants.COMMA_SPLIT_PATTERN;
+
+public class ClusterRedisClient extends AbstractRedisClient implements RedisClient {
+    private static final Logger logger = LoggerFactory.getLogger(ClusterRedisClient.class);
+
+    private static final int DEFAULT_TIMEOUT = 2000;
+
+    private static final int DEFAULT_SO_TIMEOUT = 2000;
+
+    private static final int DEFAULT_MAX_ATTEMPTS = 5;
+
+    private JedisCluster jedisCluster;
+    private Pattern COLON_SPLIT_PATTERN = Pattern.compile("\\s*[:]+\\s*");
+
+    public ClusterRedisClient(URL url) {
+        super(url);
+        Set<HostAndPort> nodes = getNodes(url);
+        jedisCluster = new JedisCluster(nodes, url.getParameter("connection.timeout", DEFAULT_TIMEOUT),
+                url.getParameter("so.timeout", DEFAULT_SO_TIMEOUT), url.getParameter("max.attempts", DEFAULT_MAX_ATTEMPTS),
+                url.getPassword(), getConfig());
+    }
+
+    @Override
+    public Long hset(String key, String field, String value) {
+        return jedisCluster.hset(key, field, value);
+    }
+
+    @Override
+    public Long publish(String channel, String message) {
+        return jedisCluster.publish(channel, message);
+    }
+
+    @Override
+    public boolean isConnected() {
+        Map<String, JedisPool> poolMap = jedisCluster.getClusterNodes();
+        for (JedisPool jedisPool : poolMap.values()) {
+            Jedis jedis = jedisPool.getResource();
+            if (jedis.isConnected()) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public void destroy() {
+        jedisCluster.close();
+    }
+
+    @Override
+    public Long hdel(String key, String... fields) {
+        return jedisCluster.hdel(key, fields);
+    }
+
+    @Override
+    public Set<String> scan(String pattern) {
+        Map<String, JedisPool> nodes = jedisCluster.getClusterNodes();
+        Set<String> result = new HashSet<>();
+        for (JedisPool jedisPool : nodes.values()) {
+            Jedis jedis = jedisPool.getResource();
+            result.addAll(scan(jedis, pattern));
+            jedis.close();
+        }
+        return result;
+    }
+
+    @Override
+    public Map<String, String> hgetAll(String key) {
+        return jedisCluster.hgetAll(key);
+    }
+
+    @Override
+    public void psubscribe(JedisPubSub jedisPubSub, String... patterns) {
+        jedisCluster.psubscribe(jedisPubSub, patterns);
+    }
+
+    @Override
+    public void disconnect() {
+        jedisCluster.close();
+    }
+
+    @Override
+    public void close() {
+        jedisCluster.close();
+    }
+
+    private Set<HostAndPort> getNodes(URL url) {
+        Set<HostAndPort> hostAndPorts = new HashSet<>();
+        hostAndPorts.add(new HostAndPort(url.getHost(), url.getPort()));
+        String backupAddresses = url.getBackupAddress(6379);
+        String[] nodes = StringUtils.isEmpty(backupAddresses) ? new String[0] : COMMA_SPLIT_PATTERN.split(backupAddresses);
+        if (nodes.length > 0) {
+            for (String node : nodes) {
+                String[] hostAndPort = COLON_SPLIT_PATTERN.split(node);
+                hostAndPorts.add(new HostAndPort(hostAndPort[0], Integer.valueOf(hostAndPort[1])));
+            }
+        }
+        return hostAndPorts;
+    }
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/jedis/MonoRedisClient.java b/dubbo-remoting-extensions/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/jedis/MonoRedisClient.java
new file mode 100644
index 0000000..864f18e
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/jedis/MonoRedisClient.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.redis.jedis;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.remoting.redis.RedisClient;
+import org.apache.dubbo.remoting.redis.support.AbstractRedisClient;
+
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisPubSub;
+
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT;
+import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
+
+public class MonoRedisClient extends AbstractRedisClient implements RedisClient {
+    private static final Logger logger = LoggerFactory.getLogger(MonoRedisClient.class);
+
+    private static final String START_CURSOR = "0";
+
+    private JedisPool jedisPool;
+
+    public MonoRedisClient(URL url) {
+        super(url);
+        jedisPool = new JedisPool(getConfig(), url.getHost(), url.getPort(),
+                url.getParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT), url.getPassword());
+    }
+
+    @Override
+    public Long hset(String key, String field, String value) {
+        Jedis jedis = jedisPool.getResource();
+        Long result = jedis.hset(key, field, value);
+        jedis.close();
+        return result;
+    }
+
+    @Override
+    public Long publish(String channel, String message) {
+        Jedis jedis = jedisPool.getResource();
+        Long result = jedis.publish(channel, message);
+        jedis.close();
+        return result;
+    }
+
+    @Override
+    public boolean isConnected() {
+        Jedis jedis = jedisPool.getResource();
+        boolean connected = jedis.isConnected();
+        jedis.close();
+        return connected;
+    }
+
+    @Override
+    public void destroy() {
+        jedisPool.close();
+    }
+
+    @Override
+    public Long hdel(String key, String... fields) {
+        Jedis jedis = jedisPool.getResource();
+        Long result = jedis.hdel(key, fields);
+        jedis.close();
+        return result;
+    }
+
+    @Override
+    public Set<String> scan(String pattern) {
+        Jedis jedis = jedisPool.getResource();
+        Set<String> result = super.scan(jedis, pattern);
+        jedis.close();
+        return result;
+    }
+
+    @Override
+    public Map<String, String> hgetAll(String key) {
+        Jedis jedis = jedisPool.getResource();
+        Map<String, String> result = jedis.hgetAll(key);
+        jedis.close();
+        return result;
+    }
+
+    @Override
+    public void psubscribe(JedisPubSub jedisPubSub, String... patterns) {
+        Jedis jedis = jedisPool.getResource();
+        jedis.psubscribe(jedisPubSub, patterns);
+        jedis.close();
+    }
+
+    @Override
+    public void disconnect() {
+        jedisPool.close();
+    }
+
+    @Override
+    public void close() {
+        jedisPool.close();
+    }
+
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/jedis/SentinelRedisClient.java b/dubbo-remoting-extensions/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/jedis/SentinelRedisClient.java
new file mode 100644
index 0000000..6a10bf9
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/jedis/SentinelRedisClient.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.redis.jedis;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.constants.RemotingConstants;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.remoting.redis.RedisClient;
+import org.apache.dubbo.remoting.redis.support.AbstractRedisClient;
+
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPubSub;
+import redis.clients.jedis.JedisSentinelPool;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+public class SentinelRedisClient extends AbstractRedisClient implements RedisClient {
+    private static final Logger logger = LoggerFactory.getLogger(SentinelRedisClient.class);
+
+    private JedisSentinelPool sentinelPool;
+
+    public SentinelRedisClient(URL url) {
+        super(url);
+        String masterName = url.getParameter("master.name", "Sentinel-master");
+        String address = (new StringBuilder()).append(url.getAddress()).toString();
+        String[] backupAddresses = url.getParameter(RemotingConstants.BACKUP_KEY, new String[0]);
+        if (backupAddresses.length == 0) {
+            throw new IllegalStateException("Sentinel addresses can not be empty");
+        }
+        Set<String> sentinels = new HashSet<>(Arrays.asList(backupAddresses));
+        sentinels.add(address);
+        sentinelPool = new JedisSentinelPool(masterName, sentinels, getConfig(), url.getPassword());
+    }
+
+    @Override
+    public Long hset(String key, String field, String value) {
+        Jedis jedis = sentinelPool.getResource();
+        Long result = jedis.hset(key, field, value);
+        jedis.close();
+        return result;
+    }
+
+    @Override
+    public Long publish(String channel, String message) {
+        Jedis jedis = sentinelPool.getResource();
+        Long result = jedis.publish(channel, message);
+        jedis.close();
+        return result;
+    }
+
+    @Override
+    public boolean isConnected() {
+        Jedis jedis = sentinelPool.getResource();
+        boolean result = jedis.isConnected();
+        jedis.close();
+        return result;
+    }
+
+    @Override
+    public void destroy() {
+        sentinelPool.close();
+    }
+
+    @Override
+    public Long hdel(String key, String... fields) {
+        Jedis jedis = sentinelPool.getResource();
+        Long result = jedis.hdel(key, fields);
+        jedis.close();
+        return result;
+    }
+
+    @Override
+    public Set<String> scan(String pattern) {
+        Jedis jedis = sentinelPool.getResource();
+        Set<String> result = scan(jedis, pattern);
+        jedis.close();
+        return result;
+    }
+
+    @Override
+    public Map<String, String> hgetAll(String key) {
+        Jedis jedis = sentinelPool.getResource();
+        Map<String, String> result = jedis.hgetAll(key);
+        jedis.close();
+        return result;
+    }
+
+    @Override
+    public void psubscribe(JedisPubSub jedisPubSub, String... patterns) {
+        Jedis jedis = sentinelPool.getResource();
+        jedis.psubscribe(jedisPubSub, patterns);
+        jedis.close();
+    }
+
+    @Override
+    public void disconnect() {
+        sentinelPool.close();
+    }
+
+    @Override
+    public void close() {
+        sentinelPool.close();
+    }
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/support/AbstractRedisClient.java b/dubbo-remoting-extensions/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/support/AbstractRedisClient.java
new file mode 100644
index 0000000..4af640e
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/support/AbstractRedisClient.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.redis.support;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.utils.CollectionUtils;
+import org.apache.dubbo.remoting.redis.RedisClient;
+
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPoolConfig;
+import redis.clients.jedis.ScanParams;
+import redis.clients.jedis.ScanResult;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public abstract class AbstractRedisClient implements RedisClient {
+    private URL url;
+
+    private JedisPoolConfig config;
+
+    public AbstractRedisClient(URL url) {
+        this.url = url;
+        config = new JedisPoolConfig();
+        config.setTestOnBorrow(url.getParameter("test.on.borrow", true));
+        config.setTestOnReturn(url.getParameter("test.on.return", false));
+        config.setTestWhileIdle(url.getParameter("test.while.idle", false));
+        if (url.getParameter("max.idle", 0) > 0) {
+            config.setMaxIdle(url.getParameter("max.idle", 0));
+        }
+        if (url.getParameter("min.idle", 0) > 0) {
+            config.setMinIdle(url.getParameter("min.idle", 0));
+        }
+        if (url.getParameter("max.active", 0) > 0) {
+            config.setMaxTotal(url.getParameter("max.active", 0));
+        }
+        if (url.getParameter("max.total", 0) > 0) {
+            config.setMaxTotal(url.getParameter("max.total", 0));
+        }
+        if (url.getParameter("max.wait", url.getParameter("timeout", 0)) > 0) {
+            config.setMaxWaitMillis(url.getParameter("max.wait", url.getParameter("timeout", 0)));
+        }
+        if (url.getParameter("num.tests.per.eviction.run", 0) > 0) {
+            config.setNumTestsPerEvictionRun(url.getParameter("num.tests.per.eviction.run", 0));
+        }
+        if (url.getParameter("time.between.eviction.runs.millis", 0) > 0) {
+            config.setTimeBetweenEvictionRunsMillis(url.getParameter("time.between.eviction.runs.millis", 0));
+        }
+        if (url.getParameter("min.evictable.idle.time.millis", 0) > 0) {
+            config.setMinEvictableIdleTimeMillis(url.getParameter("min.evictable.idle.time.millis", 0));
+        }
+    }
+
+    protected Set<String> scan(Jedis jedis, String pattern) {
+        Set<String> result = new HashSet<>();
+        String cursor = ScanParams.SCAN_POINTER_START;
+        ScanParams params = new ScanParams();
+        params.match(pattern);
+        while (true) {
+            ScanResult<String> scanResult = jedis.scan(cursor, params);
+            List<String> list = scanResult.getResult();
+            if (CollectionUtils.isNotEmpty(list)) {
+                result.addAll(list);
+            }
+            if (ScanParams.SCAN_POINTER_START.equals(scanResult.getCursor())) {
+                break;
+            }
+            cursor = scanResult.getCursor();
+        }
+        return result;
+    }
+
+    public URL getUrl() {
+        return url;
+    }
+
+    public JedisPoolConfig getConfig() {
+        return config;
+    }
+}
diff --git a/dubbo-remoting-extensions/pom.xml b/dubbo-remoting-extensions/pom.xml
index 6a6b72a..9b52ee1 100644
--- a/dubbo-remoting-extensions/pom.xml
+++ b/dubbo-remoting-extensions/pom.xml
@@ -32,6 +32,11 @@
 
     <modules>
         <module>dubbo-remoting-quic</module>
+        <module>dubbo-remoting-etcd3</module>
+        <module>dubbo-remoting-grizzly</module>
+        <module>dubbo-remoting-mina</module>
+        <module>dubbo-remoting-p2p</module>
+        <module>dubbo-remoting-redis</module>
     </modules>