You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by il...@apache.org on 2019/03/16 05:12:55 UTC

[incubator-dubbo] branch 2.7.1-release updated: [Dubbo-3653] etcd as config center (#3663)

This is an automated email from the ASF dual-hosted git repository.

iluo pushed a commit to branch 2.7.1-release
in repository https://gitbox.apache.org/repos/asf/incubator-dubbo.git


The following commit(s) were added to refs/heads/2.7.1-release by this push:
     new cf23d61  [Dubbo-3653] etcd as config center (#3663)
cf23d61 is described below

commit cf23d612f9ba3f1c508071b420260eb695a787d8
Author: Huxing Zhang <hu...@gmail.com>
AuthorDate: Sat Mar 16 13:11:33 2019 +0800

    [Dubbo-3653] etcd as config center (#3663)
    
    * Minor refactor, no functinoal change.
    
    * Separate ConnectionStateListener
    
    * Simplify code
    
    * Fix typo
    
    * Support get external config from etcd config center
    
    * Polish diamond operator
    
    * Initial etcd support as config center
    
    * Add a put interface for JEtcdClient
    
    * Enhanced Etcd config center support with the ability to watch and cancel watch
    
    * Polish code
    
    * Distinguish modification event and delete event
    
    * Add etcd registry and configcenter to dubbo-all
    
    * Watch again when connection is re-established
---
 dubbo-all/pom.xml                                  |  16 ++
 dubbo-bom/pom.xml                                  |   5 +
 dubbo-configcenter/dubbo-configcenter-etcd/pom.xml |  46 ++++
 .../support/etcd/EtcdDynamicConfiguration.java     | 187 ++++++++++++++
 .../etcd/EtcdDynamicConfigurationFactory.java      |  33 +++
 ....dubbo.configcenter.DynamicConfigurationFactory |   1 +
 .../support/etcd/EtcdDynamicConfigurationTest.java | 141 +++++++++++
 dubbo-configcenter/pom.xml                         |   1 +
 .../apache/dubbo/registry/etcd/EtcdRegistry.java   |  18 +-
 .../org/apache/dubbo/remoting/etcd/EtcdClient.java |  16 ++
 .../etcd/jetcd/ConnectionStateListener.java        |  31 +++
 .../dubbo/remoting/etcd/jetcd/JEtcdClient.java     |  22 +-
 .../remoting/etcd/jetcd/JEtcdClientWrapper.java    | 282 +++++++++++----------
 .../remoting/etcd/support/AbstractEtcdClient.java  |   4 +-
 .../dubbo/remoting/etcd/jetcd/JEtcdClientTest.java | 166 ++++++++++++
 15 files changed, 811 insertions(+), 158 deletions(-)

diff --git a/dubbo-all/pom.xml b/dubbo-all/pom.xml
index edb0f36..a4a0b5c 100644
--- a/dubbo-all/pom.xml
+++ b/dubbo-all/pom.xml
@@ -243,6 +243,13 @@
         </dependency>
         <dependency>
             <groupId>org.apache.dubbo</groupId>
+            <artifactId>dubbo-registry-etcd3</artifactId>
+            <version>${project.version}</version>
+            <scope>compile</scope>
+            <optional>true</optional>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.dubbo</groupId>
             <artifactId>dubbo-monitor-api</artifactId>
             <version>${project.version}</version>
             <scope>compile</scope>
@@ -362,6 +369,13 @@
         </dependency>
         <dependency>
             <groupId>org.apache.dubbo</groupId>
+            <artifactId>dubbo-configcenter-etcd</artifactId>
+            <version>${project.version}</version>
+            <scope>compile</scope>
+            <optional>true</optional>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.dubbo</groupId>
             <artifactId>dubbo-compatible</artifactId>
             <version>${project.version}</version>
             <scope>compile</scope>
@@ -494,6 +508,7 @@
                                     <include>org.apache.dubbo:dubbo-registry-zookeeper</include>
                                     <include>org.apache.dubbo:dubbo-registry-redis</include>
                                     <include>org.apache.dubbo:dubbo-registry-consul</include>
+                                    <include>org.apache.dubbo:dubbo-registry-etcd3</include>
                                     <include>org.apache.dubbo:dubbo-monitor-api</include>
                                     <include>org.apache.dubbo:dubbo-monitor-default</include>
                                     <include>org.apache.dubbo:dubbo-config-api</include>
@@ -515,6 +530,7 @@
                                     <include>org.apache.dubbo:dubbo-configcenter-apollo</include>
                                     <include>org.apache.dubbo:dubbo-configcenter-zookeeper</include>
                                     <include>org.apache.dubbo:dubbo-configcenter-consul</include>
+                                    <include>org.apache.dubbo:dubbo-configcenter-etcd</include>
                                     <include>org.apache.dubbo:dubbo-metadata-report-api</include>
                                     <include>org.apache.dubbo:dubbo-metadata-definition</include>
                                     <include>org.apache.dubbo:dubbo-metadata-report-redis</include>
diff --git a/dubbo-bom/pom.xml b/dubbo-bom/pom.xml
index d04495a..47875b8 100644
--- a/dubbo-bom/pom.xml
+++ b/dubbo-bom/pom.xml
@@ -345,6 +345,11 @@
             </dependency>
             <dependency>
                 <groupId>org.apache.dubbo</groupId>
+                <artifactId>dubbo-configcenter-etcd</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.dubbo</groupId>
                 <artifactId>dubbo-metadata-definition</artifactId>
                 <version>${project.version}</version>
             </dependency>
diff --git a/dubbo-configcenter/dubbo-configcenter-etcd/pom.xml b/dubbo-configcenter/dubbo-configcenter-etcd/pom.xml
new file mode 100644
index 0000000..60efc8e
--- /dev/null
+++ b/dubbo-configcenter/dubbo-configcenter-etcd/pom.xml
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>dubbo-configcenter</artifactId>
+        <groupId>org.apache.dubbo</groupId>
+        <version>2.7.1-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>dubbo-configcenter-etcd</artifactId>
+    <packaging>jar</packaging>
+    <name>${project.artifactId}</name>
+    <description>The etcd implementation of the config-center api</description>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.dubbo</groupId>
+            <artifactId>dubbo-configcenter-api</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.dubbo</groupId>
+            <artifactId>dubbo-remoting-etcd3</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file
diff --git a/dubbo-configcenter/dubbo-configcenter-etcd/src/main/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfiguration.java b/dubbo-configcenter/dubbo-configcenter-etcd/src/main/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfiguration.java
new file mode 100644
index 0000000..18e9088
--- /dev/null
+++ b/dubbo-configcenter/dubbo-configcenter-etcd/src/main/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfiguration.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.configcenter.support.etcd;
+
+import com.google.protobuf.ByteString;
+import io.etcd.jetcd.api.Event;
+import io.etcd.jetcd.api.WatchCancelRequest;
+import io.etcd.jetcd.api.WatchCreateRequest;
+import io.etcd.jetcd.api.WatchGrpc;
+import io.etcd.jetcd.api.WatchRequest;
+import io.etcd.jetcd.api.WatchResponse;
+import io.grpc.ManagedChannel;
+import io.grpc.stub.StreamObserver;
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.utils.StringUtils;
+import org.apache.dubbo.configcenter.ConfigChangeEvent;
+import org.apache.dubbo.configcenter.ConfigChangeType;
+import org.apache.dubbo.configcenter.ConfigurationListener;
+import org.apache.dubbo.configcenter.DynamicConfiguration;
+import org.apache.dubbo.remoting.etcd.StateListener;
+import org.apache.dubbo.remoting.etcd.jetcd.JEtcdClient;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.dubbo.common.Constants.CONFIG_NAMESPACE_KEY;
+import static org.apache.dubbo.common.Constants.PATH_SEPARATOR;
+
+/**
+ * The etcd implementation of {@link DynamicConfiguration}
+ */
+public class EtcdDynamicConfiguration implements DynamicConfiguration {
+
+    /**
+     * The final root path would be: /$NAME_SPACE/config
+     */
+    private String rootPath;
+
+    /**
+     * The etcd client
+     */
+    private final JEtcdClient etcdClient;
+
+    /**
+     * The map store the key to {@link EtcdConfigWatcher} mapping
+     */
+    private final ConcurrentMap<ConfigurationListener, EtcdConfigWatcher> watchListenerMap;
+
+    EtcdDynamicConfiguration(URL url) {
+        rootPath = "/" + url.getParameter(CONFIG_NAMESPACE_KEY, DEFAULT_GROUP) + "/config";
+        etcdClient = new JEtcdClient(url);
+        etcdClient.addStateListener(state -> {
+            if (state == StateListener.CONNECTED) {
+                try {
+                    recover();
+                } catch (Exception e) {
+                    // ignore
+                }
+            }
+        });
+        watchListenerMap = new ConcurrentHashMap<>();
+    }
+
+    @Override
+    public void addListener(String key, String group, ConfigurationListener listener) {
+        if (watchListenerMap.get(listener) == null) {
+            String normalizedKey = convertKey(key);
+            EtcdConfigWatcher watcher = new EtcdConfigWatcher(normalizedKey, listener);
+            watchListenerMap.put(listener, watcher);
+            watcher.watch();
+        }
+    }
+
+    @Override
+    public void removeListener(String key, String group, ConfigurationListener listener) {
+        EtcdConfigWatcher watcher = watchListenerMap.get(listener);
+        watcher.cancelWatch();
+    }
+
+    // TODO Abstract the logic into super class
+    @Override
+    public String getConfig(String key, String group, long timeout) throws IllegalStateException {
+        if (StringUtils.isNotEmpty(group)) {
+            key = group + PATH_SEPARATOR + key;
+        } else {
+            int i = key.lastIndexOf(".");
+            key = key.substring(0, i) + PATH_SEPARATOR + key.substring(i + 1);
+        }
+        return (String) getInternalProperty(rootPath + PATH_SEPARATOR + key);
+    }
+
+    @Override
+    public Object getInternalProperty(String key) {
+        return etcdClient.getKVValue(key);
+    }
+
+
+    private String convertKey(String key) {
+        int index = key.lastIndexOf('.');
+        return rootPath + PATH_SEPARATOR + key.substring(0, index) + PATH_SEPARATOR + key.substring(index + 1);
+    }
+
+    private void recover() {
+        for (EtcdConfigWatcher watcher: watchListenerMap.values()) {
+            watcher.watch();
+        }
+    }
+
+    public class EtcdConfigWatcher implements StreamObserver<WatchResponse> {
+
+        private ConfigurationListener listener;
+        protected WatchGrpc.WatchStub watchStub;
+        private StreamObserver<WatchRequest> observer;
+        protected long watchId;
+        private ManagedChannel channel;
+        private String key;
+
+        public EtcdConfigWatcher(String key, ConfigurationListener listener) {
+            this.key = key;
+            this.listener = listener;
+            this.channel = etcdClient.getChannel();
+        }
+
+        @Override
+        public void onNext(WatchResponse watchResponse) {
+            this.watchId = watchResponse.getWatchId();
+            for (Event etcdEvent : watchResponse.getEventsList()) {
+                ConfigChangeType type = ConfigChangeType.MODIFIED;
+                if (etcdEvent.getType() == Event.EventType.DELETE) {
+                    type = ConfigChangeType.DELETED;
+                }
+                ConfigChangeEvent event = new ConfigChangeEvent(
+                        etcdEvent.getKv().getKey().toString(UTF_8),
+                        etcdEvent.getKv().getValue().toString(UTF_8), type);
+                listener.process(event);
+            }
+        }
+
+        @Override
+        public void onError(Throwable throwable) {
+            // ignore
+        }
+
+        @Override
+        public void onCompleted() {
+            // ignore
+        }
+
+        public long getWatchId() {
+            return watchId;
+        }
+
+        private void watch() {
+            watchStub = WatchGrpc.newStub(channel);
+            observer = watchStub.watch(this);
+            WatchCreateRequest.Builder builder = WatchCreateRequest.newBuilder()
+                    .setKey(ByteString.copyFromUtf8(key))
+                    .setProgressNotify(true);
+            WatchRequest req = WatchRequest.newBuilder().setCreateRequest(builder).build();
+            observer.onNext(req);
+        }
+
+        private void cancelWatch() {
+            WatchCancelRequest watchCancelRequest =
+                    WatchCancelRequest.newBuilder().setWatchId(watchId).build();
+            WatchRequest cancelRequest = WatchRequest.newBuilder()
+                    .setCancelRequest(watchCancelRequest).build();
+            observer.onNext(cancelRequest);
+        }
+    }
+}
diff --git a/dubbo-configcenter/dubbo-configcenter-etcd/src/main/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfigurationFactory.java b/dubbo-configcenter/dubbo-configcenter-etcd/src/main/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfigurationFactory.java
new file mode 100644
index 0000000..02e91a6
--- /dev/null
+++ b/dubbo-configcenter/dubbo-configcenter-etcd/src/main/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfigurationFactory.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.configcenter.support.etcd;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.configcenter.AbstractDynamicConfigurationFactory;
+import org.apache.dubbo.configcenter.DynamicConfiguration;
+
+/**
+ * The etcd implementation of {@link AbstractDynamicConfigurationFactory}
+ */
+public class EtcdDynamicConfigurationFactory extends AbstractDynamicConfigurationFactory {
+
+    @Override
+    protected DynamicConfiguration createDynamicConfiguration(URL url) {
+        return new EtcdDynamicConfiguration(url);
+    }
+}
diff --git a/dubbo-configcenter/dubbo-configcenter-etcd/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.configcenter.DynamicConfigurationFactory b/dubbo-configcenter/dubbo-configcenter-etcd/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.configcenter.DynamicConfigurationFactory
new file mode 100644
index 0000000..d84b1ae
--- /dev/null
+++ b/dubbo-configcenter/dubbo-configcenter-etcd/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.configcenter.DynamicConfigurationFactory
@@ -0,0 +1 @@
+etcd=org.apache.dubbo.configcenter.support.etcd.EtcdDynamicConfigurationFactory
\ No newline at end of file
diff --git a/dubbo-configcenter/dubbo-configcenter-etcd/src/test/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfigurationTest.java b/dubbo-configcenter/dubbo-configcenter-etcd/src/test/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfigurationTest.java
new file mode 100644
index 0000000..87143fd
--- /dev/null
+++ b/dubbo-configcenter/dubbo-configcenter-etcd/src/test/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfigurationTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.configcenter.support.etcd;
+
+import io.etcd.jetcd.ByteSequence;
+import io.etcd.jetcd.Client;
+import org.apache.dubbo.common.Constants;
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.configcenter.ConfigChangeEvent;
+import org.apache.dubbo.configcenter.ConfigurationListener;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+/**
+ * Unit test for etcd config center support
+ * TODO Integrate with https://github.com/etcd-io/jetcd#launcher or using mock data.
+ */
+@Disabled
+public class EtcdDynamicConfigurationTest {
+
+    private static final String ENDPOINT = "http://127.0.0.1:2379";
+
+    private static EtcdDynamicConfiguration config;
+
+    private static Client etcdClient;
+
+    @Test
+    public void testGetConfig() {
+        put("/dubbo/config/org.apache.dubbo.etcd.testService/configurators", "hello");
+        put("/dubbo/config/test/dubbo.properties", "aaa=bbb");
+        Assertions.assertEquals("hello", config.getConfig("org.apache.dubbo.etcd.testService.configurators"));
+        Assertions.assertEquals("aaa=bbb", config.getConfig("dubbo.properties", "test"));
+    }
+
+    @Test
+    public void testAddListener() throws Exception {
+        CountDownLatch latch = new CountDownLatch(4);
+        TestListener listener1 = new TestListener(latch);
+        TestListener listener2 = new TestListener(latch);
+        TestListener listener3 = new TestListener(latch);
+        TestListener listener4 = new TestListener(latch);
+        config.addListener("AService.configurators", listener1);
+        config.addListener("AService.configurators", listener2);
+        config.addListener("testapp.tagrouters", listener3);
+        config.addListener("testapp.tagrouters", listener4);
+
+        put("/dubbo/config/AService/configurators", "new value1");
+        Thread.sleep(200);
+        put("/dubbo/config/testapp/tagrouters", "new value2");
+        Thread.sleep(200);
+        put("/dubbo/config/testapp", "new value3");
+
+        Thread.sleep(1000);
+
+        Assertions.assertTrue(latch.await(5, TimeUnit.SECONDS));
+        Assertions.assertEquals(1, listener1.getCount("/dubbo/config/AService/configurators"));
+        Assertions.assertEquals(1, listener2.getCount("/dubbo/config/AService/configurators"));
+        Assertions.assertEquals(1, listener3.getCount("/dubbo/config/testapp/tagrouters"));
+        Assertions.assertEquals(1, listener4.getCount("/dubbo/config/testapp/tagrouters"));
+
+        Assertions.assertEquals("new value1", listener1.getValue());
+        Assertions.assertEquals("new value1", listener2.getValue());
+        Assertions.assertEquals("new value2", listener3.getValue());
+        Assertions.assertEquals("new value2", listener4.getValue());
+    }
+
+    private class TestListener implements ConfigurationListener {
+        private CountDownLatch latch;
+        private String value;
+        private Map<String, Integer> countMap = new HashMap<>();
+
+        public TestListener(CountDownLatch latch) {
+            this.latch = latch;
+        }
+
+        @Override
+        public void process(ConfigChangeEvent event) {
+            Integer count = countMap.computeIfAbsent(event.getKey(), k -> 0);
+            countMap.put(event.getKey(), ++count);
+            value = event.getValue();
+            latch.countDown();
+        }
+
+        public int getCount(String key) {
+            return countMap.get(key);
+        }
+
+        public String getValue() {
+            return value;
+        }
+    }
+
+    static void put(String key, String value) {
+        try {
+            etcdClient.getKVClient()
+                    .put(ByteSequence.from(key, UTF_8), ByteSequence.from(value, UTF_8))
+                    .get();
+        } catch (Exception e) {
+            System.out.println("Error put value to etcd.");
+        }
+    }
+
+    @BeforeAll
+    static void setUp() {
+        etcdClient = Client.builder().endpoints(ENDPOINT).build();
+        // timeout in 15 seconds.
+        URL url = URL.valueOf("etcd3://127.0.0.1:2379/org.apache.dubbo.etcd.testService")
+                .addParameter(Constants.SESSION_TIMEOUT_KEY, 15000);
+        config = new EtcdDynamicConfiguration(url);
+    }
+
+    @AfterAll
+    static void tearDown() {
+        etcdClient.close();
+    }
+}
diff --git a/dubbo-configcenter/pom.xml b/dubbo-configcenter/pom.xml
index fa703be..92f727d 100644
--- a/dubbo-configcenter/pom.xml
+++ b/dubbo-configcenter/pom.xml
@@ -34,5 +34,6 @@
         <module>dubbo-configcenter-zookeeper</module>
         <module>dubbo-configcenter-apollo</module>
         <module>dubbo-configcenter-consul</module>
+        <module>dubbo-configcenter-etcd</module>
     </modules>
 </project>
diff --git a/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdRegistry.java b/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdRegistry.java
index 504d521..f0d9406 100644
--- a/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdRegistry.java
+++ b/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdRegistry.java
@@ -71,7 +71,7 @@ public class EtcdRegistry extends FailbackRegistry {
 
     private final Set<String> anyServices = new ConcurrentHashSet<String>();
 
-    private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> etcdListeners = new ConcurrentHashMap<URL, ConcurrentMap<NotifyListener, ChildListener>>();
+    private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> etcdListeners = new ConcurrentHashMap<>();
     private final EtcdClient etcdClient;
     private long expirePeriod;
 
@@ -86,14 +86,12 @@ public class EtcdRegistry extends FailbackRegistry {
         }
         this.root = group;
         etcdClient = etcdTransporter.connect(url);
-        etcdClient.addStateListener(new StateListener() {
-            public void stateChanged(int state) {
-                if (state == CONNECTED) {
-                    try {
-                        recover();
-                    } catch (Exception e) {
-                        logger.error(e.getMessage(), e);
-                    }
+        etcdClient.addStateListener(state -> {
+            if (state == StateListener.CONNECTED) {
+                try {
+                    recover();
+                } catch (Exception e) {
+                    logger.error(e.getMessage(), e);
                 }
             }
         });
@@ -345,7 +343,7 @@ public class EtcdRegistry extends FailbackRegistry {
     }
 
     protected List<URL> toUrlsWithoutEmpty(URL consumer, List<String> providers) {
-        List<URL> urls = new ArrayList<URL>();
+        List<URL> urls = new ArrayList<>();
         if (providers != null && providers.size() > 0) {
             for (String provider : providers) {
                 provider = URL.decode(provider);
diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdClient.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdClient.java
index b1e765d..286be93 100644
--- a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdClient.java
+++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdClient.java
@@ -164,4 +164,20 @@ public interface EtcdClient {
      */
     void revokeLease(long lease);
 
+
+    /**
+     * Get the value of the specified key.
+     * @param key the specified key
+     * @return null if the value is not found
+     */
+    String getKVValue(String key);
+
+    /**
+     * Put the key value pair to etcd
+     * @param key the specified key
+     * @param value the paired value
+     * @return true if put success
+     */
+    boolean put(String key, String value);
+
 }
diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/ConnectionStateListener.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/ConnectionStateListener.java
new file mode 100644
index 0000000..788aa40
--- /dev/null
+++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/ConnectionStateListener.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.remoting.etcd.jetcd;
+
+import io.etcd.jetcd.Client;
+
+public interface ConnectionStateListener {
+
+    /**
+     * Called when there is a state change in the connection
+     *
+     * @param client   the client
+     * @param newState the new state
+     */
+    void stateChanged(Client client, int newState);
+}
\ No newline at end of file
diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java
index d07cad0..ff4c118 100644
--- a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java
+++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java
@@ -17,6 +17,7 @@
 
 package org.apache.dubbo.remoting.etcd.jetcd;
 
+import io.grpc.ManagedChannel;
 import org.apache.dubbo.common.Constants;
 import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.logger.Logger;
@@ -185,6 +186,20 @@ public class JEtcdClient extends AbstractEtcdClient<JEtcdClient.EtcdWatcher> {
         }
     }
 
+    @Override
+    public String getKVValue(String key) {
+        return clientWrapper.getKVValue(key);
+    }
+
+    @Override
+    public boolean put(String key, String value) {
+        return clientWrapper.put(key, value);
+    }
+
+    public ManagedChannel getChannel() {
+        return clientWrapper.getChannel();
+    }
+
     public class EtcdWatcher implements StreamObserver<WatchResponse> {
 
         protected WatchGrpc.WatchStub watchStub;
@@ -233,12 +248,7 @@ public class JEtcdClient extends AbstractEtcdClient<JEtcdClient.EtcdWatcher> {
                     }
                 }
                 if (modified > 0) {
-                    notifyExecutor.execute(new Runnable() {
-                        @Override
-                        public void run() {
-                            listener.childChanged(path, new ArrayList<>(urls));
-                        }
-                    });
+                    notifyExecutor.execute(() -> listener.childChanged(path, new ArrayList<>(urls)));
                 }
 
             }
diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java
index 8515b61..c7f472d 100644
--- a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java
+++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java
@@ -17,6 +17,7 @@
 
 package org.apache.dubbo.remoting.etcd.jetcd;
 
+import io.etcd.jetcd.kv.PutResponse;
 import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
@@ -31,9 +32,11 @@ import io.etcd.jetcd.ByteSequence;
 import io.etcd.jetcd.Client;
 import io.etcd.jetcd.ClientBuilder;
 import io.etcd.jetcd.CloseableClient;
+import io.etcd.jetcd.KeyValue;
 import io.etcd.jetcd.Observers;
 import io.etcd.jetcd.common.exception.ErrorCode;
 import io.etcd.jetcd.common.exception.EtcdException;
+import io.etcd.jetcd.kv.GetResponse;
 import io.etcd.jetcd.lease.LeaseKeepAliveResponse;
 import io.etcd.jetcd.options.GetOption;
 import io.etcd.jetcd.options.PutOption;
@@ -87,7 +90,8 @@ public class JEtcdClientWrapper {
     private RuntimeException failed;
 
     private final ScheduledFuture<?> retryFuture;
-    private final ScheduledExecutorService retryExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("Etcd3RegistryKeepAliveFailedRetryTimer", true));
+    private final ScheduledExecutorService retryExecutor = Executors.newScheduledThreadPool(1,
+            new NamedThreadFactory("Etcd3RegistryKeepAliveFailedRetryTimer", true));
 
     private final Set<String> failedRegistered = new ConcurrentHashSet<String>();
 
@@ -117,28 +121,26 @@ public class JEtcdClientWrapper {
 
         this.failed = new IllegalStateException("Etcd3 registry is not connected yet, url:" + url);
         int retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
-        this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
-            public void run() {
-                try {
-                    retry();
-                } catch (Throwable t) {
-                    logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
-                }
+        this.retryFuture = retryExecutor.scheduleWithFixedDelay(() -> {
+            try {
+                retry();
+            } catch (Throwable t) {
+                logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
             }
         }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
     }
 
     private Client prepareClient(URL url) {
 
-        int maxInboudSize = DEFAULT_INBOUT_SIZE;
-        if (StringUtils.isNotEmpty(System.getProperty(GRPC_MAX_INBOUD_SIZE_KEY))) {
-            maxInboudSize = Integer.valueOf(System.getProperty(GRPC_MAX_INBOUD_SIZE_KEY));
+        int maxInboundSize = DEFAULT_INBOUND_SIZE;
+        if (StringUtils.isNotEmpty(System.getProperty(GRPC_MAX_INBOUND_SIZE_KEY))) {
+            maxInboundSize = Integer.valueOf(System.getProperty(GRPC_MAX_INBOUND_SIZE_KEY));
         }
 
         ClientBuilder clientBuilder = Client.builder()
                 .loadBalancerFactory(RoundRobinLoadBalancerFactory.getInstance())
                 .endpoints(endPoints(url.getBackupAddress()))
-                .maxInboundMessageSize(maxInboudSize);
+                .maxInboundMessageSize(maxInboundSize);
 
         return clientBuilder.build();
     }
@@ -170,29 +172,26 @@ public class JEtcdClientWrapper {
     public List<String> getChildren(String path) {
         try {
             return RetryLoops.invokeWithRetry(
-                    new Callable<List<String>>() {
-                        @Override
-                        public List<String> call() throws Exception {
-                            requiredNotNull(client, failed);
-                            int len = path.length();
-                            return client.getKVClient()
-                                    .get(ByteSequence.from(path, UTF_8),
-                                            GetOption.newBuilder().withPrefix(ByteSequence.from(path, UTF_8)).build())
-                                    .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS)
-                                    .getKvs().stream().parallel()
-                                    .filter(pair -> {
-                                        String key = pair.getKey().toString(UTF_8);
-                                        int index = len, count = 0;
-                                        if (key.length() > len) {
-                                            for (; (index = key.indexOf(Constants.PATH_SEPARATOR, index)) != -1; ++index) {
-                                                if (count++ > 1) break;
-                                            }
+                    () -> {
+                        requiredNotNull(client, failed);
+                        int len = path.length();
+                        return client.getKVClient()
+                                .get(ByteSequence.from(path, UTF_8),
+                                        GetOption.newBuilder().withPrefix(ByteSequence.from(path, UTF_8)).build())
+                                .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS)
+                                .getKvs().stream().parallel()
+                                .filter(pair -> {
+                                    String key = pair.getKey().toString(UTF_8);
+                                    int index = len, count = 0;
+                                    if (key.length() > len) {
+                                        for (; (index = key.indexOf(Constants.PATH_SEPARATOR, index)) != -1; ++index) {
+                                            if (count++ > 1) break;
                                         }
-                                        return count == 1;
-                                    })
-                                    .map(pair -> pair.getKey().toString(UTF_8))
-                                    .collect(toList());
-                        }
+                                    }
+                                    return count == 1;
+                                })
+                                .map(pair -> pair.getKey().toString(UTF_8))
+                                .collect(toList());
                     }, retryPolicy);
         } catch (Exception e) {
             throw new IllegalStateException(e.getMessage(), e);
@@ -207,15 +206,12 @@ public class JEtcdClientWrapper {
     public long createLease(long second) {
         try {
             return RetryLoops.invokeWithRetry(
-                    new Callable<Long>() {
-                        @Override
-                        public Long call() throws Exception {
-                            requiredNotNull(client, failed);
-                            return client.getLeaseClient()
-                                    .grant(second)
-                                    .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS)
-                                    .getID();
-                        }
+                    () -> {
+                        requiredNotNull(client, failed);
+                        return client.getLeaseClient()
+                                .grant(second)
+                                .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS)
+                                .getID();
                     }, retryPolicy);
         } catch (Exception e) {
             throw new IllegalStateException(e.getMessage(), e);
@@ -225,15 +221,12 @@ public class JEtcdClientWrapper {
     public void revokeLease(long lease) {
         try {
             RetryLoops.invokeWithRetry(
-                    new Callable<Void>() {
-                        @Override
-                        public Void call() throws Exception {
-                            requiredNotNull(client, failed);
-                            client.getLeaseClient()
-                                    .revoke(lease)
-                                    .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS);
-                            return null;
-                        }
+                    (Callable<Void>) () -> {
+                        requiredNotNull(client, failed);
+                        client.getLeaseClient()
+                                .revoke(lease)
+                                .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS);
+                        return null;
                     }, retryPolicy);
         } catch (Exception e) {
             throw new IllegalStateException(e.getMessage(), e);
@@ -260,15 +253,12 @@ public class JEtcdClientWrapper {
     public boolean checkExists(String path) {
         try {
             return RetryLoops.invokeWithRetry(
-                    new Callable<Boolean>() {
-                        @Override
-                        public Boolean call() throws Exception {
-                            requiredNotNull(client, failed);
-                            return client.getKVClient()
-                                    .get(ByteSequence.from(path, UTF_8), GetOption.newBuilder().withCountOnly(true).build())
-                                    .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS)
-                                    .getCount() > 0;
-                        }
+                    () -> {
+                        requiredNotNull(client, failed);
+                        return client.getKVClient()
+                                .get(ByteSequence.from(path, UTF_8), GetOption.newBuilder().withCountOnly(true).build())
+                                .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS)
+                                .getCount() > 0;
                     }, retryPolicy);
         } catch (Exception e) {
             throw new IllegalStateException(e.getMessage(), e);
@@ -281,17 +271,14 @@ public class JEtcdClientWrapper {
     protected Long find(String path) {
         try {
             return RetryLoops.invokeWithRetry(
-                    new Callable<Long>() {
-                        @Override
-                        public Long call() throws Exception {
-                            requiredNotNull(client, failed);
-                            return client.getKVClient()
-                                    .get(ByteSequence.from(path, UTF_8))
-                                    .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS)
-                                    .getKvs().stream()
-                                    .mapToLong(keyValue -> Long.valueOf(keyValue.getValue().toString(UTF_8)))
-                                    .findFirst().getAsLong();
-                        }
+                    () -> {
+                        requiredNotNull(client, failed);
+                        return client.getKVClient()
+                                .get(ByteSequence.from(path, UTF_8))
+                                .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS)
+                                .getKvs().stream()
+                                .mapToLong(keyValue -> Long.valueOf(keyValue.getValue().toString(UTF_8)))
+                                .findFirst().getAsLong();
                     }, retryPolicy);
         } catch (Exception e) {
             throw new IllegalStateException(e.getMessage(), e);
@@ -301,16 +288,13 @@ public class JEtcdClientWrapper {
     public void createPersistent(String path) {
         try {
             RetryLoops.invokeWithRetry(
-                    new Callable<Void>() {
-                        @Override
-                        public Void call() throws Exception {
-                            requiredNotNull(client, failed);
-                            client.getKVClient()
-                                    .put(ByteSequence.from(path, UTF_8),
-                                            ByteSequence.from(String.valueOf(path.hashCode()), UTF_8))
-                                    .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS);
-                            return null;
-                        }
+                    (Callable<Void>) () -> {
+                        requiredNotNull(client, failed);
+                        client.getKVClient()
+                                .put(ByteSequence.from(path, UTF_8),
+                                        ByteSequence.from(String.valueOf(path.hashCode()), UTF_8))
+                                .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS);
+                        return null;
                     }, retryPolicy);
         } catch (Exception e) {
             throw new IllegalStateException(e.getMessage(), e);
@@ -328,21 +312,18 @@ public class JEtcdClientWrapper {
     public long createEphemeral(String path) {
         try {
             return RetryLoops.invokeWithRetry(
-                    new Callable<Long>() {
-                        @Override
-                        public Long call() throws Exception {
-                            requiredNotNull(client, failed);
-
-                            registeredPaths.add(path);
-                            keepAlive();
-                            final long leaseId = globalLeaseId;
-                            client.getKVClient()
-                                    .put(ByteSequence.from(path, UTF_8)
-                                            , ByteSequence.from(String.valueOf(leaseId), UTF_8)
-                                            , PutOption.newBuilder().withLeaseId(leaseId).build())
-                                    .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS);
-                            return leaseId;
-                        }
+                    () -> {
+                        requiredNotNull(client, failed);
+
+                        registeredPaths.add(path);
+                        keepAlive();
+                        final long leaseId = globalLeaseId;
+                        client.getKVClient()
+                                .put(ByteSequence.from(path, UTF_8)
+                                        , ByteSequence.from(String.valueOf(leaseId), UTF_8)
+                                        , PutOption.newBuilder().withLeaseId(leaseId).build())
+                                .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS);
+                        return leaseId;
                     }, retryPolicy);
         } catch (Exception e) {
             throw new IllegalStateException(e.getMessage(), e);
@@ -354,6 +335,7 @@ public class JEtcdClientWrapper {
         this.keepAlive(lease, null);
     }
 
+    @SuppressWarnings("unchecked")
     private <T> void keepAlive(long lease, Consumer<T> onFailed) {
         final StreamObserver<LeaseKeepAliveResponse> observer = new Observers.Builder()
                 .onError((e) -> {
@@ -471,16 +453,13 @@ public class JEtcdClientWrapper {
     public void delete(String path) {
         try {
             RetryLoops.invokeWithRetry(
-                    new Callable<Void>() {
-                        @Override
-                        public Void call() throws Exception {
-                            requiredNotNull(client, failed);
-                            client.getKVClient()
-                                    .delete(ByteSequence.from(path, UTF_8))
-                                    .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS);
-                            registeredPaths.remove(path);
-                            return null;
-                        }
+                    (Callable<Void>) () -> {
+                        requiredNotNull(client, failed);
+                        client.getKVClient()
+                                .delete(ByteSequence.from(path, UTF_8))
+                                .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS);
+                        registeredPaths.remove(path);
+                        return null;
                     }, retryPolicy);
         } catch (Exception e) {
             throw new IllegalStateException(e.getMessage(), e);
@@ -494,13 +473,13 @@ public class JEtcdClientWrapper {
 
     public String[] endPoints(String backupAddress) {
         String[] endpoints = backupAddress.split(Constants.COMMA_SEPARATOR);
-        List<String> addressess = Arrays.stream(endpoints)
-                .map(address -> address.indexOf(Constants.HTTP_SUBFIX_KEY) > -1
+        List<String> addresses = Arrays.stream(endpoints)
+                .map(address -> address.contains(Constants.HTTP_SUBFIX_KEY)
                         ? address
                         : Constants.HTTP_KEY + address)
                 .collect(toList());
-        Collections.shuffle(addressess);
-        return addressess.toArray(new String[0]);
+        Collections.shuffle(addresses);
+        return addresses.toArray(new String[0]);
     }
 
     /**
@@ -527,26 +506,22 @@ public class JEtcdClientWrapper {
             }
 
             try {
-                this.future = reconnectNotify.scheduleWithFixedDelay(new Runnable() {
-                    @Override
-                    public void run() {
-                        boolean connected = isConnected();
-                        if (connectState != connected) {
-                            int notifyState = connected ? StateListener.CONNECTED : StateListener.DISCONNECTED;
-                            if (connectionStateListener != null) {
-                                try {
-                                    if (connected) {
-                                        clearKeepAlive();
-                                    }
-                                    connectionStateListener.stateChanged(getClient(), notifyState);
-                                } finally {
-                                    cancelKeepAlive = false;
+                this.future = reconnectNotify.scheduleWithFixedDelay(() -> {
+                    boolean connected = isConnected();
+                    if (connectState != connected) {
+                        int notifyState = connected ? StateListener.CONNECTED : StateListener.DISCONNECTED;
+                        if (connectionStateListener != null) {
+                            try {
+                                if (connected) {
+                                    clearKeepAlive();
                                 }
+                                connectionStateListener.stateChanged(getClient(), notifyState);
+                            } finally {
+                                cancelKeepAlive = false;
                             }
-                            connectState = connected;
                         }
+                        connectState = connected;
                     }
-
                 }, Constants.DEFAULT_REGISTRY_RECONNECT_PERIOD, Constants.DEFAULT_REGISTRY_RECONNECT_PERIOD, TimeUnit.MILLISECONDS);
             } catch (Throwable t) {
                 logger.error("monitor reconnect status failed.", t);
@@ -575,7 +550,9 @@ public class JEtcdClientWrapper {
 
         try {
             cancelKeepAlive = true;
-            revokeLease(this.globalLeaseId);
+            if (globalLeaseId > 0) {
+                revokeLease(this.globalLeaseId);
+            }
         } catch (Exception e) {
             logger.warn("revoke global lease '" + globalLeaseId + "' failed, registry: " + url, e);
         }
@@ -638,6 +615,41 @@ public class JEtcdClientWrapper {
         }
     }
 
+    public String getKVValue(String key) {
+        if (null == key) {
+            return null;
+        }
+
+        CompletableFuture<GetResponse> responseFuture = this.client.getKVClient().get(ByteSequence.from(key, UTF_8));
+
+        try {
+            List<KeyValue> result = responseFuture.get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS).getKvs();
+            if (!result.isEmpty()) {
+                return result.get(0).getValue().toString(UTF_8);
+            }
+        } catch (Exception e) {
+            // ignore
+        }
+
+        return null;
+    }
+
+
+    public boolean put(String key, String value) {
+        if (key == null || value == null) {
+            return false;
+        }
+        CompletableFuture<PutResponse> putFuture =
+                this.client.getKVClient().put(ByteSequence.from(key, UTF_8), ByteSequence.from(value, UTF_8));
+        try {
+            putFuture.get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS);
+            return true;
+        } catch (Exception e) {
+            // ignore
+        }
+        return false;
+    }
+
     private void retry() {
         if (!failedRegistered.isEmpty()) {
             Set<String> failed = new HashSet<String>(failedRegistered);
@@ -679,24 +691,14 @@ public class JEtcdClientWrapper {
         }
     }
 
-    public interface ConnectionStateListener {
-        /**
-         * Called when there is a state change in the connection
-         *
-         * @param client   the client
-         * @param newState the new state
-         */
-        public void stateChanged(Client client, int newState);
-    }
-
     /**
      * default request timeout
      */
     public static final long DEFAULT_REQUEST_TIMEOUT = obtainRequestTimeout();
 
-    public static final int DEFAULT_INBOUT_SIZE = 100 * 1024 * 1024;
+    public static final int DEFAULT_INBOUND_SIZE = 100 * 1024 * 1024;
 
-    public static final String GRPC_MAX_INBOUD_SIZE_KEY = "grpc.max.inbound.size";
+    public static final String GRPC_MAX_INBOUND_SIZE_KEY = "grpc.max.inbound.size";
 
     public static final String ETCD_REQUEST_TIMEOUT_KEY = "etcd.request.timeout";
 
diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/support/AbstractEtcdClient.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/support/AbstractEtcdClient.java
index 31752bf..5fecd14 100644
--- a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/support/AbstractEtcdClient.java
+++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/support/AbstractEtcdClient.java
@@ -57,7 +57,7 @@ public abstract class AbstractEtcdClient<WatcherListener> implements EtcdClient
 
     private final Set<StateListener> stateListeners = new ConcurrentHashSet<>();
 
-    private final ConcurrentMap<String, ConcurrentMap<ChildListener, WatcherListener>> childListeners = new ConcurrentHashMap<String, ConcurrentMap<ChildListener, WatcherListener>>();
+    private final ConcurrentMap<String, ConcurrentMap<ChildListener, WatcherListener>> childListeners = new ConcurrentHashMap<>();
     private final List<String> categroies = Arrays.asList(Constants.PROVIDERS_CATEGORY
             , Constants.CONSUMERS_CATEGORY
             , Constants.ROUTERS_CATEGORY
@@ -99,7 +99,7 @@ public abstract class AbstractEtcdClient<WatcherListener> implements EtcdClient
     public List<String> addChildListener(String path, final ChildListener listener) {
         ConcurrentMap<ChildListener, WatcherListener> listeners = childListeners.get(path);
         if (listeners == null) {
-            childListeners.putIfAbsent(path, new ConcurrentHashMap<ChildListener, WatcherListener>());
+            childListeners.putIfAbsent(path, new ConcurrentHashMap<>());
             listeners = childListeners.get(path);
         }
         WatcherListener targetListener = listeners.get(listener);
diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/test/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientTest.java b/dubbo-remoting/dubbo-remoting-etcd3/src/test/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientTest.java
index 19254ab..9674fee 100644
--- a/dubbo-remoting/dubbo-remoting-etcd3/src/test/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientTest.java
+++ b/dubbo-remoting/dubbo-remoting-etcd3/src/test/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientTest.java
@@ -33,8 +33,21 @@
  */
 package org.apache.dubbo.remoting.etcd.jetcd;
 
+import com.google.protobuf.ByteString;
+import io.etcd.jetcd.ByteSequence;
+import io.etcd.jetcd.Client;
+import io.etcd.jetcd.Watch;
+import io.etcd.jetcd.api.Event;
+import io.etcd.jetcd.api.WatchCancelRequest;
+import io.etcd.jetcd.api.WatchCreateRequest;
+import io.etcd.jetcd.api.WatchGrpc;
+import io.etcd.jetcd.api.WatchRequest;
+import io.etcd.jetcd.api.WatchResponse;
 import io.etcd.jetcd.common.exception.ClosedClientException;
+import io.etcd.jetcd.watch.WatchEvent;
+import io.grpc.ManagedChannel;
 import io.grpc.Status;
+import io.grpc.stub.StreamObserver;
 import org.apache.dubbo.common.Constants;
 import org.apache.dubbo.common.URL;
 import org.apache.dubbo.remoting.etcd.ChildListener;
@@ -44,8 +57,13 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
 
 @Disabled
 public class JEtcdClientTest {
@@ -76,6 +94,139 @@ public class JEtcdClientTest {
     }
 
     @Test
+    public void test_watch_when_modify() {
+        String path = "/dubbo/config/jetcd-client-unit-test/configurators";
+        String endpoint = "http://127.0.0.1:2379";
+        CountDownLatch latch = new CountDownLatch(1);
+        ByteSequence key = ByteSequence.from(path, UTF_8);
+
+        Watch.Listener listener = Watch.listener(response -> {
+            for (WatchEvent event : response.getEvents()) {
+                Assertions.assertEquals("PUT", event.getEventType().toString());
+                Assertions.assertEquals(path, event.getKeyValue().getKey().toString(UTF_8));
+                Assertions.assertEquals("Hello", event.getKeyValue().getValue().toString(UTF_8));
+                latch.countDown();
+            }
+
+        });
+
+        try (Client client = Client.builder().endpoints(endpoint).build();
+             Watch watch = client.getWatchClient();
+             Watch.Watcher watcher = watch.watch(key, listener)) {
+            // try to modify the key
+            client.getKVClient().put(ByteSequence.from(path, UTF_8), ByteSequence.from("Hello", UTF_8));
+            latch.await();
+        } catch (Exception e) {
+            Assertions.fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testWatchWithGrpc() {
+        String path = "/dubbo/config/test_watch_with_grpc/configurators";
+        String endpoint = "http://127.0.0.1:2379";
+        CountDownLatch latch = new CountDownLatch(1);
+        try (Client client = Client.builder().endpoints(endpoint).build()) {
+            ManagedChannel channel = getChannel(client);
+            StreamObserver<WatchRequest> observer = WatchGrpc.newStub(channel).watch(new StreamObserver<WatchResponse>() {
+                @Override
+                public void onNext(WatchResponse response) {
+                    for (Event event : response.getEventsList()) {
+                        Assertions.assertEquals("PUT", event.getType().toString());
+                        Assertions.assertEquals(path, event.getKv().getKey().toString(UTF_8));
+                        Assertions.assertEquals("Hello", event.getKv().getValue().toString(UTF_8));
+                        latch.countDown();
+                    }
+                }
+
+                @Override
+                public void onError(Throwable throwable) {
+
+                }
+
+                @Override
+                public void onCompleted() {
+
+                }
+            });
+            WatchCreateRequest.Builder builder = WatchCreateRequest.newBuilder()
+                    .setKey(ByteString.copyFrom(path, UTF_8));
+
+            observer.onNext(WatchRequest.newBuilder().setCreateRequest(builder).build());
+
+            // try to modify the key
+            client.getKVClient().put(ByteSequence.from(path, UTF_8), ByteSequence.from("Hello", UTF_8));
+            latch.await(5, TimeUnit.SECONDS);
+        } catch (Exception e) {
+            Assertions.fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testCancelWatchWithGrpc() {
+        String path = "/dubbo/config/testCancelWatchWithGrpc/configurators";
+        String endpoint = "http://127.0.0.1:2379";
+        CountDownLatch updateLatch = new CountDownLatch(1);
+        CountDownLatch cancelLatch = new CountDownLatch(1);
+        final AtomicLong watchID = new AtomicLong(-1L);
+        try (Client client = Client.builder().endpoints(endpoint).build()) {
+            ManagedChannel channel = getChannel(client);
+            StreamObserver<WatchRequest> observer = WatchGrpc.newStub(channel).watch(new StreamObserver<WatchResponse>() {
+                @Override
+                public void onNext(WatchResponse response) {
+                    watchID.set(response.getWatchId());
+                    for (Event event : response.getEventsList()) {
+                        Assertions.assertEquals("PUT", event.getType().toString());
+                        Assertions.assertEquals(path, event.getKv().getKey().toString(UTF_8));
+                        Assertions.assertEquals("Hello", event.getKv().getValue().toString(UTF_8));
+                        updateLatch.countDown();
+                    }
+                    if (response.getCanceled()) {
+                        // received the cancel response
+                        cancelLatch.countDown();
+                    }
+                }
+
+                @Override
+                public void onError(Throwable throwable) {
+
+                }
+
+                @Override
+                public void onCompleted() {
+
+                }
+            });
+            // create
+            WatchCreateRequest.Builder builder = WatchCreateRequest.newBuilder()
+                    .setKey(ByteString.copyFrom(path, UTF_8));
+
+            // make the grpc call to watch the key
+            observer.onNext(WatchRequest.newBuilder().setCreateRequest(builder).build());
+
+            // try to put the value
+            client.getKVClient().put(ByteSequence.from(path, UTF_8), ByteSequence.from("Hello", UTF_8));
+
+            // response received, latch counts down to zero
+            updateLatch.await();
+
+            WatchCancelRequest watchCancelRequest =
+                    WatchCancelRequest.newBuilder().setWatchId(watchID.get()).build();
+            WatchRequest cancelRequest = WatchRequest.newBuilder()
+                    .setCancelRequest(watchCancelRequest).build();
+            observer.onNext(cancelRequest);
+
+            // try to put the value
+            client.getKVClient().put(ByteSequence.from(path, UTF_8), ByteSequence.from("Hello world", UTF_8));
+
+            cancelLatch.await();
+        } catch (Exception e) {
+            Assertions.fail(e.getMessage());
+        }
+
+    }
+
+    @Test
     public void test_watch_when_create_wrong_path() throws InterruptedException {
 
         String path = "/dubbo/com.alibaba.dubbo.demo.DemoService/providers";
@@ -257,4 +408,19 @@ public class JEtcdClientTest {
             return ++value;
         }
     }
+
+    private ManagedChannel getChannel(Client client) {
+        try {
+            // hack, use reflection to get the shared channel.
+            Field connectionField = client.getClass().getDeclaredField("connectionManager");
+            connectionField.setAccessible(true);
+            Object connection = connectionField.get(client);
+            Method channelMethod = connection.getClass().getDeclaredMethod("getChannel");
+            channelMethod.setAccessible(true);
+            ManagedChannel channel = (ManagedChannel) channelMethod.invoke(connection);
+            return channel;
+        } catch (Exception e) {
+            return null;
+        }
+    }
 }