You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2019/03/14 11:14:09 UTC

[incubator-dubbo] branch master updated: Merge pull request #3603, configcenter share zookeeper connection with registry.

This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-dubbo.git


The following commit(s) were added to refs/heads/master by this push:
     new 9598cd0  Merge pull request #3603, configcenter share zookeeper connection with registry.
9598cd0 is described below

commit 9598cd09607b2dbe660ae16f4a95c79e107907a8
Author: cvictory <sh...@gmail.com>
AuthorDate: Thu Mar 14 19:14:03 2019 +0800

    Merge pull request #3603, configcenter share zookeeper connection with registry.
    
    Fixes #3288
---
 .../dubbo/configcenter/ConfigChangeEvent.java      |  10 +-
 .../dubbo-configcenter-zookeeper/pom.xml           |  15 +-
 .../support/zookeeper/CacheListener.java           |  94 ++++++------
 .../zookeeper/ZookeeperDynamicConfiguration.java   |  71 ++-------
 .../ZookeeperDynamicConfigurationFactory.java      |  11 +-
 .../ZookeeperDynamicConfigurationTest.java         |   1 +
 dubbo-dependencies-bom/pom.xml                     |  14 +-
 .../dubbo-dependencies-zookeeper/pom.xml           |   4 -
 dubbo-remoting/dubbo-remoting-zookeeper/pom.xml    |  16 +-
 ...ZookeeperTransporter.java => DataListener.java} |  16 +-
 .../apache/dubbo/remoting/zookeeper/EventType.java |  65 ++++++++
 .../dubbo/remoting/zookeeper/ZookeeperClient.java  |  16 ++
 .../zookeeper/curator/CuratorZookeeperClient.java  | 158 ++++++++++++++++---
 .../zookeeper/support/AbstractZookeeperClient.java |  45 +++++-
 .../zookeeper/zkclient/ZkClientWrapper.java        | 144 ------------------
 .../zkclient/ZkclientZookeeperClient.java          | 168 ---------------------
 ...e.dubbo.remoting.zookeeper.ZookeeperTransporter |   3 +-
 .../curator/CuratorZookeeperClientTest.java        |  44 +++++-
 .../zookeeper/zkclient/ZkClientWrapperTest.java    |  56 -------
 .../zkclient/ZkclientZookeeperClientTest.java      | 140 -----------------
 .../zkclient/ZkclientZookeeperTransporterTest.java |  53 -------
 21 files changed, 396 insertions(+), 748 deletions(-)

diff --git a/dubbo-configcenter/dubbo-configcenter-api/src/main/java/org/apache/dubbo/configcenter/ConfigChangeEvent.java b/dubbo-configcenter/dubbo-configcenter-api/src/main/java/org/apache/dubbo/configcenter/ConfigChangeEvent.java
index 4a2190a..cdedd15 100644
--- a/dubbo-configcenter/dubbo-configcenter-api/src/main/java/org/apache/dubbo/configcenter/ConfigChangeEvent.java
+++ b/dubbo-configcenter/dubbo-configcenter-api/src/main/java/org/apache/dubbo/configcenter/ConfigChangeEvent.java
@@ -49,4 +49,12 @@ public class ConfigChangeEvent {
         return changeType;
     }
 
-}
\ No newline at end of file
+    @Override
+    public String toString() {
+        return "ConfigChangeEvent{" +
+                "key='" + key + '\'' +
+                ", value='" + value + '\'' +
+                ", changeType=" + changeType +
+                '}';
+    }
+}
diff --git a/dubbo-configcenter/dubbo-configcenter-zookeeper/pom.xml b/dubbo-configcenter/dubbo-configcenter-zookeeper/pom.xml
index bb9e1ad..5c84f65 100644
--- a/dubbo-configcenter/dubbo-configcenter-zookeeper/pom.xml
+++ b/dubbo-configcenter/dubbo-configcenter-zookeeper/pom.xml
@@ -33,16 +33,9 @@
             <version>${project.parent.version}</version>
         </dependency>
         <dependency>
-            <groupId>org.apache.curator</groupId>
-            <artifactId>curator-framework</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.curator</groupId>
-            <artifactId>curator-recipes</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.zookeeper</groupId>
-            <artifactId>zookeeper</artifactId>
+            <groupId>org.apache.dubbo</groupId>
+            <artifactId>dubbo-remoting-zookeeper</artifactId>
+            <version>${project.parent.version}</version>
         </dependency>
         <dependency>
             <groupId>org.apache.curator</groupId>
@@ -50,4 +43,4 @@
             <scope>test</scope>
         </dependency>
     </dependencies>
-</project>
\ No newline at end of file
+</project>
diff --git a/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/CacheListener.java b/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/CacheListener.java
index 1851a22..4f6c638 100644
--- a/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/CacheListener.java
+++ b/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/CacheListener.java
@@ -21,13 +21,9 @@ import org.apache.dubbo.common.utils.StringUtils;
 import org.apache.dubbo.configcenter.ConfigChangeEvent;
 import org.apache.dubbo.configcenter.ConfigChangeType;
 import org.apache.dubbo.configcenter.ConfigurationListener;
+import org.apache.dubbo.remoting.zookeeper.DataListener;
+import org.apache.dubbo.remoting.zookeeper.EventType;
 
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.cache.ChildData;
-import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
-import org.apache.curator.framework.recipes.cache.TreeCacheListener;
-
-import java.nio.charset.StandardCharsets;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -37,9 +33,8 @@ import java.util.concurrent.CountDownLatch;
 /**
  *
  */
-public class CacheListener implements TreeCacheListener {
-    private static final byte[] EMPTY_BYTES = new byte[0];
 
+public class CacheListener implements DataListener {
     private Map<String, Set<ConfigurationListener>> keyListeners = new ConcurrentHashMap<>();
     private CountDownLatch initializedLatch;
     private String rootPath;
@@ -49,76 +44,73 @@ public class CacheListener implements TreeCacheListener {
         this.initializedLatch = initializedLatch;
     }
 
+    public void addListener(String key, ConfigurationListener configurationListener) {
+        Set<ConfigurationListener> listeners = this.keyListeners.computeIfAbsent(key, k -> new CopyOnWriteArraySet<>());
+        listeners.add(configurationListener);
+    }
+
+    public void removeListener(String key, ConfigurationListener configurationListener) {
+        Set<ConfigurationListener> listeners = this.keyListeners.get(key);
+        if (listeners != null) {
+            listeners.remove(configurationListener);
+        }
+    }
+
+    /**
+     * This is used to convert a configuration nodePath into a key
+     * TODO doc
+     *
+     * @param path
+     * @return key (nodePath less the config root path)
+     */
+    private String pathToKey(String path) {
+        if (StringUtils.isEmpty(path)) {
+            return path;
+        }
+        return path.replace(rootPath + "/", "").replaceAll("/", ".");
+    }
+
+
     @Override
-    public void childEvent(CuratorFramework aClient, TreeCacheEvent event) throws Exception {
+    public void dataChanged(String path, Object value, EventType eventType) {
+        if (eventType == null) {
+            return;
+        }
 
-        TreeCacheEvent.Type type = event.getType();
-        ChildData data = event.getData();
-        if (type == TreeCacheEvent.Type.INITIALIZED) {
+        if (eventType == EventType.INITIALIZED) {
             initializedLatch.countDown();
             return;
         }
 
-        // TODO, ignore other event types
-        if (data == null) {
+        if (path == null || (value == null && eventType != EventType.NodeDeleted)) {
             return;
         }
 
         // TODO We limit the notification of config changes to a specific path level, for example
         //  /dubbo/config/service/configurators, other config changes not in this level will not get notified,
         //  say /dubbo/config/dubbo.properties
-        if (data.getPath().split("/").length >= 5) {
-            byte[] value = data.getData();
-            String key = pathToKey(data.getPath());
+        if (path.split("/").length >= 5) {
+            String key = pathToKey(path);
             ConfigChangeType changeType;
-            switch (type) {
-                case NODE_ADDED:
+            switch (eventType) {
+                case NodeCreated:
                     changeType = ConfigChangeType.ADDED;
                     break;
-                case NODE_REMOVED:
+                case NodeDeleted:
                     changeType = ConfigChangeType.DELETED;
                     break;
-                case NODE_UPDATED:
+                case NodeDataChanged:
                     changeType = ConfigChangeType.MODIFIED;
                     break;
                 default:
                     return;
             }
 
-            if (value == null) {
-                value = EMPTY_BYTES;
-            }
-            ConfigChangeEvent configChangeEvent = new ConfigChangeEvent(key, new String(value, StandardCharsets.UTF_8), changeType);
+            ConfigChangeEvent configChangeEvent = new ConfigChangeEvent(key, (String) value, changeType);
             Set<ConfigurationListener> listeners = keyListeners.get(key);
             if (CollectionUtils.isNotEmpty(listeners)) {
                 listeners.forEach(listener -> listener.process(configChangeEvent));
             }
         }
     }
-
-    public void addListener(String key, ConfigurationListener configurationListener) {
-        Set<ConfigurationListener> listeners = this.keyListeners.computeIfAbsent(key, k -> new CopyOnWriteArraySet<>());
-        listeners.add(configurationListener);
-    }
-
-    public void removeListener(String key, ConfigurationListener configurationListener) {
-        Set<ConfigurationListener> listeners = this.keyListeners.get(key);
-        if (listeners != null) {
-            listeners.remove(configurationListener);
-        }
-    }
-
-    /**
-     * This is used to convert a configuration nodePath into a key
-     * TODO doc
-     *
-     * @param path
-     * @return key (nodePath less the config root path)
-     */
-    private String pathToKey(String path) {
-        if (StringUtils.isEmpty(path)) {
-            return path;
-        }
-        return path.replace(rootPath + "/", "").replaceAll("/", ".");
-    }
 }
diff --git a/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfiguration.java b/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfiguration.java
index 7a106f8..dac49cc 100644
--- a/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfiguration.java
+++ b/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfiguration.java
@@ -16,28 +16,21 @@
  */
 package org.apache.dubbo.configcenter.support.zookeeper;
 
-import org.apache.dubbo.common.Constants;
 import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.utils.NamedThreadFactory;
 import org.apache.dubbo.common.utils.StringUtils;
 import org.apache.dubbo.configcenter.ConfigurationListener;
 import org.apache.dubbo.configcenter.DynamicConfiguration;
+import org.apache.dubbo.remoting.zookeeper.ZookeeperClient;
+import org.apache.dubbo.remoting.zookeeper.ZookeeperTransporter;
 
-import org.apache.curator.RetryPolicy;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.cache.ChildData;
-import org.apache.curator.framework.recipes.cache.TreeCache;
-import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.nio.charset.StandardCharsets;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 
-import static org.apache.curator.framework.CuratorFrameworkFactory.newClient;
 import static org.apache.dubbo.common.Constants.CONFIG_NAMESPACE_KEY;
 
 /**
@@ -45,52 +38,32 @@ import static org.apache.dubbo.common.Constants.CONFIG_NAMESPACE_KEY;
  */
 public class ZookeeperDynamicConfiguration implements DynamicConfiguration {
     private static final Logger logger = LoggerFactory.getLogger(ZookeeperDynamicConfiguration.class);
-    private Executor executor;
-    private CuratorFramework client;
 
+    private Executor executor;
     // The final root path would be: /configRootPath/"config"
     private String rootPath;
-    private TreeCache treeCache;
+    private final ZookeeperClient zkClient;
     private CountDownLatch initializedLatch;
 
     private CacheListener cacheListener;
     private URL url;
 
 
-    ZookeeperDynamicConfiguration(URL url) {
+    ZookeeperDynamicConfiguration(URL url, ZookeeperTransporter zookeeperTransporter) {
         this.url = url;
         rootPath = "/" + url.getParameter(CONFIG_NAMESPACE_KEY, DEFAULT_GROUP) + "/config";
 
-        RetryPolicy policy = new ExponentialBackoffRetry(1000, 3);
-        int sessionTimeout = url.getParameter("config.session.timeout", 60 * 1000);
-        int connectTimeout = url.getParameter("config.connect.timeout", 10 * 1000);
-        String connectString = url.getBackupAddress();
-        client = newClient(connectString, sessionTimeout, connectTimeout, policy);
-        client.start();
-
-        try {
-            boolean connected = client.blockUntilConnected(3 * connectTimeout, TimeUnit.MILLISECONDS);
-            if (!connected) {
-                if (url.getParameter(Constants.CONFIG_CHECK_KEY, true)) {
-                    throw new IllegalStateException("Failed to connect to config center (zookeeper): "
-                            + connectString + " in " + 3 * connectTimeout + "ms.");
-                } else {
-                    logger.warn("The config center (zookeeper) is not fully initialized in " + 3 * connectTimeout + "ms, address is: " + connectString);
-                }
-            }
-        } catch (InterruptedException e) {
-            throw new IllegalStateException("The thread was interrupted unexpectedly when trying connecting to zookeeper "
-                    + connectString + " config center, ", e);
-        }
-
         initializedLatch = new CountDownLatch(1);
         this.cacheListener = new CacheListener(rootPath, initializedLatch);
         this.executor = Executors.newFixedThreadPool(1, new NamedThreadFactory(this.getClass().getSimpleName(), true));
-        // build local cache
+
+        zkClient = zookeeperTransporter.connect(url);
+        zkClient.addDataListener(rootPath, cacheListener, executor);
         try {
-            this.buildCache();
-        } catch (Exception e) {
-            logger.warn("Failed to build local cache for config center (zookeeper), address is ." + connectString);
+            // Wait for connection
+            this.initializedLatch.await();
+        } catch (InterruptedException e) {
+            logger.warn("Failed to build local cache for config center (zookeeper)." + url);
         }
     }
 
@@ -100,11 +73,7 @@ public class ZookeeperDynamicConfiguration implements DynamicConfiguration {
      */
     @Override
     public Object getInternalProperty(String key) {
-        ChildData childData = treeCache.getCurrentData(key);
-        if (childData != null) {
-            return new String(childData.getData(), StandardCharsets.UTF_8);
-        }
-        return null;
+        return zkClient.getContent(key);
     }
 
     /**
@@ -141,18 +110,4 @@ public class ZookeeperDynamicConfiguration implements DynamicConfiguration {
 
         return (String) getInternalProperty(rootPath + "/" + key);
     }
-
-    /**
-     * Adds a listener to the pathChildrenCache, initializes the cache, then starts the cache-management background
-     * thread
-     */
-    private void buildCache() throws Exception {
-        this.treeCache = new TreeCache(client, rootPath);
-        // create the watcher for future configuration updates
-        treeCache.getListenable().addListener(cacheListener, executor);
-
-        // it's not blocking, so we use an extra latch 'initializedLatch' to make sure cache fully initialized before use.
-        treeCache.start();
-        initializedLatch.await();
-    }
 }
diff --git a/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfigurationFactory.java b/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfigurationFactory.java
index 7994e04..4d78133 100644
--- a/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfigurationFactory.java
+++ b/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfigurationFactory.java
@@ -19,13 +19,22 @@ package org.apache.dubbo.configcenter.support.zookeeper;
 import org.apache.dubbo.common.URL;
 import org.apache.dubbo.configcenter.AbstractDynamicConfigurationFactory;
 import org.apache.dubbo.configcenter.DynamicConfiguration;
+import org.apache.dubbo.remoting.zookeeper.ZookeeperTransporter;
 
 /**
  *
  */
 public class ZookeeperDynamicConfigurationFactory extends AbstractDynamicConfigurationFactory {
+
+    private ZookeeperTransporter zookeeperTransporter;
+
+    public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
+        this.zookeeperTransporter = zookeeperTransporter;
+    }
+
+
     @Override
     protected DynamicConfiguration createDynamicConfiguration(URL url) {
-        return new ZookeeperDynamicConfiguration(url);
+        return new ZookeeperDynamicConfiguration(url, zookeeperTransporter);
     }
 }
diff --git a/dubbo-configcenter/dubbo-configcenter-zookeeper/src/test/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfigurationTest.java b/dubbo-configcenter/dubbo-configcenter-zookeeper/src/test/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfigurationTest.java
index e1ca40f..40f9f04 100644
--- a/dubbo-configcenter/dubbo-configcenter-zookeeper/src/test/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfigurationTest.java
+++ b/dubbo-configcenter/dubbo-configcenter-zookeeper/src/test/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfigurationTest.java
@@ -133,6 +133,7 @@ public class ZookeeperDynamicConfigurationTest {
 
         @Override
         public void process(ConfigChangeEvent event) {
+            System.out.println(this + ": " + event);
             Integer count = countMap.computeIfAbsent(event.getKey(), k -> new Integer(0));
             countMap.put(event.getKey(), ++count);
 
diff --git a/dubbo-dependencies-bom/pom.xml b/dubbo-dependencies-bom/pom.xml
index 1b5d108..245b6ff 100644
--- a/dubbo-dependencies-bom/pom.xml
+++ b/dubbo-dependencies-bom/pom.xml
@@ -101,7 +101,6 @@
         <httpcore_version>4.4.6</httpcore_version>
         <fastjson_version>1.2.46</fastjson_version>
         <zookeeper_version>3.4.13</zookeeper_version>
-        <zkclient_version>0.2</zkclient_version>
         <curator_version>4.0.1</curator_version>
         <curator_test_version>2.12.0</curator_test_version>
         <jedis_version>2.9.0</jedis_version>
@@ -151,7 +150,7 @@
         <dependencies>
             <!-- Common libs -->
             <dependency>
-                <groupId>org.springframework</groupId>
+<groupId>org.springframework</groupId>
                 <artifactId>spring-framework-bom</artifactId>
                 <version>${spring_version}</version>
                 <type>pom</type>
@@ -209,17 +208,6 @@
                 </exclusions>
             </dependency>
             <dependency>
-                <groupId>com.101tec</groupId>
-                <artifactId>zkclient</artifactId>
-                <version>${zkclient_version}</version>
-                <exclusions>
-                    <exclusion>
-                        <groupId>org.apache.zookeeper</groupId>
-                        <artifactId>zookeeper</artifactId>
-                    </exclusion>
-                </exclusions>
-            </dependency>
-            <dependency>
                 <groupId>org.apache.curator</groupId>
                 <artifactId>curator-framework</artifactId>
                 <version>${curator_version}</version>
diff --git a/dubbo-dependencies/dubbo-dependencies-zookeeper/pom.xml b/dubbo-dependencies/dubbo-dependencies-zookeeper/pom.xml
index 7305c2b..18a3f2a 100644
--- a/dubbo-dependencies/dubbo-dependencies-zookeeper/pom.xml
+++ b/dubbo-dependencies/dubbo-dependencies-zookeeper/pom.xml
@@ -51,10 +51,6 @@
             <artifactId>curator-recipes</artifactId>
         </dependency>
         <dependency>
-            <groupId>com.101tec</groupId>
-            <artifactId>zkclient</artifactId>
-        </dependency>
-        <dependency>
             <groupId>org.apache.zookeeper</groupId>
             <artifactId>zookeeper</artifactId>
         </dependency>
diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/pom.xml b/dubbo-remoting/dubbo-remoting-zookeeper/pom.xml
index 97ce12a..24b14e9 100644
--- a/dubbo-remoting/dubbo-remoting-zookeeper/pom.xml
+++ b/dubbo-remoting/dubbo-remoting-zookeeper/pom.xml
@@ -35,16 +35,10 @@
             <version>${project.parent.version}</version>
         </dependency>
         <dependency>
-            <groupId>org.apache.zookeeper</groupId>
-            <artifactId>zookeeper</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>com.101tec</groupId>
-            <artifactId>zkclient</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.curator</groupId>
-            <artifactId>curator-framework</artifactId>
+            <groupId>org.apache.dubbo</groupId>
+            <artifactId>dubbo-dependencies-zookeeper</artifactId>
+            <version>${project.parent.version}</version>
+            <type>pom</type>
         </dependency>
         <dependency>
             <groupId>org.apache.curator</groupId>
@@ -52,4 +46,4 @@
             <scope>test</scope>
         </dependency>
     </dependencies>
-</project>
\ No newline at end of file
+</project>
diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkclientZookeeperTransporter.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/DataListener.java
similarity index 64%
rename from dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkclientZookeeperTransporter.java
rename to dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/DataListener.java
index 0ad86ff..95b948a 100644
--- a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkclientZookeeperTransporter.java
+++ b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/DataListener.java
@@ -14,16 +14,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.dubbo.remoting.zookeeper.zkclient;
+package org.apache.dubbo.remoting.zookeeper;
 
-import org.apache.dubbo.common.URL;
-import org.apache.dubbo.remoting.zookeeper.ZookeeperClient;
-import org.apache.dubbo.remoting.zookeeper.support.AbstractZookeeperTransporter;
-
-public class ZkclientZookeeperTransporter extends AbstractZookeeperTransporter {
-    @Override
-    public ZookeeperClient createZookeeperClient(URL url) {
-        return new ZkclientZookeeperClient(url);
-    }
+/**
+ * 2019-02-26
+ */
+public interface DataListener {
 
+    void dataChanged(String path, Object value, EventType eventType);
 }
diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/EventType.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/EventType.java
new file mode 100644
index 0000000..a1de037
--- /dev/null
+++ b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/EventType.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.zookeeper;
+
+import org.apache.zookeeper.Watcher;
+
+/**
+ * 2019-02-26
+ */
+public enum EventType {
+    None(-1),
+    NodeCreated(1),
+    NodeDeleted(2),
+    NodeDataChanged(3),
+    NodeChildrenChanged(4),
+    CONNECTION_SUSPENDED(11),
+    CONNECTION_RECONNECTED(12),
+    CONNECTION_LOST(12),
+    INITIALIZED(10);
+
+
+
+    private final int intValue;     // Integer representation of value
+    // for sending over wire
+
+    EventType(int intValue) {
+        this.intValue = intValue;
+    }
+
+    public int getIntValue() {
+        return intValue;
+    }
+
+    public static Watcher.Event.EventType fromInt(int intValue) {
+        switch (intValue) {
+            case -1:
+                return Watcher.Event.EventType.None;
+            case 1:
+                return Watcher.Event.EventType.NodeCreated;
+            case 2:
+                return Watcher.Event.EventType.NodeDeleted;
+            case 3:
+                return Watcher.Event.EventType.NodeDataChanged;
+            case 4:
+                return Watcher.Event.EventType.NodeChildrenChanged;
+
+            default:
+                throw new RuntimeException("Invalid integer value for conversion to EventType");
+        }
+    }
+}
diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/ZookeeperClient.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/ZookeeperClient.java
index b6875ee..cbb3747 100644
--- a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/ZookeeperClient.java
+++ b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/ZookeeperClient.java
@@ -19,6 +19,7 @@ package org.apache.dubbo.remoting.zookeeper;
 import org.apache.dubbo.common.URL;
 
 import java.util.List;
+import java.util.concurrent.Executor;
 
 public interface ZookeeperClient {
 
@@ -30,6 +31,21 @@ public interface ZookeeperClient {
 
     List<String> addChildListener(String path, ChildListener listener);
 
+    /**
+     * @param path:    directory. All of child of path will be listened.
+     * @param listener
+     */
+    void addDataListener(String path, DataListener listener);
+
+    /**
+     * @param path:    directory. All of child of path will be listened.
+     * @param listener
+     * @param executor another thread
+     */
+    void addDataListener(String path, DataListener listener, Executor executor);
+
+    void removeDataListener(String path, DataListener listener);
+
     void removeChildListener(String path, ChildListener listener);
 
     void addStateListener(StateListener listener);
diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClient.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClient.java
index a78edda..4bf7b6d 100644
--- a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClient.java
+++ b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClient.java
@@ -16,18 +16,24 @@
  */
 package org.apache.dubbo.remoting.zookeeper.curator;
 
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.api.CuratorWatcher;
-import org.apache.curator.framework.state.ConnectionState;
-import org.apache.curator.framework.state.ConnectionStateListener;
-import org.apache.curator.retry.RetryNTimes;
 import org.apache.dubbo.common.Constants;
 import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.utils.StringUtils;
 import org.apache.dubbo.remoting.zookeeper.ChildListener;
+import org.apache.dubbo.remoting.zookeeper.DataListener;
+import org.apache.dubbo.remoting.zookeeper.EventType;
 import org.apache.dubbo.remoting.zookeeper.StateListener;
 import org.apache.dubbo.remoting.zookeeper.support.AbstractZookeeperClient;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.api.CuratorWatcher;
+import org.apache.curator.framework.recipes.cache.TreeCache;
+import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
+import org.apache.curator.framework.recipes.cache.TreeCacheListener;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.retry.RetryNTimes;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException.NoNodeException;
 import org.apache.zookeeper.KeeperException.NodeExistsException;
@@ -36,11 +42,15 @@ import org.apache.zookeeper.WatchedEvent;
 import java.nio.charset.Charset;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
 
-public class CuratorZookeeperClient extends AbstractZookeeperClient<CuratorWatcher> {
+public class CuratorZookeeperClient extends AbstractZookeeperClient<CuratorZookeeperClient.CuratorWatcherImpl, CuratorZookeeperClient.CuratorWatcherImpl> {
 
-    private final Charset charset = Charset.forName("UTF-8");
+    static final Charset charset = Charset.forName("UTF-8");
     private final CuratorFramework client;
+    private Map<String, TreeCache> treeCacheMap = new ConcurrentHashMap<>();
 
 
     public CuratorZookeeperClient(URL url) {
@@ -96,10 +106,15 @@ public class CuratorZookeeperClient extends AbstractZookeeperClient<CuratorWatch
 
     @Override
     protected void createPersistent(String path, String data) {
+        byte[] dataBytes = data.getBytes(charset);
         try {
-            byte[] dataBytes = data.getBytes(charset);
             client.create().forPath(path, dataBytes);
         } catch (NodeExistsException e) {
+            try {
+                client.setData().forPath(path, dataBytes);
+            } catch (Exception e1) {
+                throw new IllegalStateException(e.getMessage(), e1);
+            }
         } catch (Exception e) {
             throw new IllegalStateException(e.getMessage(), e);
         }
@@ -107,10 +122,15 @@ public class CuratorZookeeperClient extends AbstractZookeeperClient<CuratorWatch
 
     @Override
     protected void createEphemeral(String path, String data) {
+        byte[] dataBytes = data.getBytes(charset);
         try {
-            byte[] dataBytes = data.getBytes(charset);
             client.create().withMode(CreateMode.EPHEMERAL).forPath(path, dataBytes);
         } catch (NodeExistsException e) {
+            try {
+                client.setData().forPath(path, dataBytes);
+            } catch (Exception e1) {
+                throw new IllegalStateException(e.getMessage(), e1);
+            }
         } catch (Exception e) {
             throw new IllegalStateException(e.getMessage(), e);
         }
@@ -172,12 +192,12 @@ public class CuratorZookeeperClient extends AbstractZookeeperClient<CuratorWatch
     }
 
     @Override
-    public CuratorWatcher createTargetChildListener(String path, ChildListener listener) {
-        return new CuratorWatcherImpl(listener);
+    public CuratorZookeeperClient.CuratorWatcherImpl createTargetChildListener(String path, ChildListener listener) {
+        return new CuratorZookeeperClient.CuratorWatcherImpl(client, listener);
     }
 
     @Override
-    public List<String> addTargetChildListener(String path, CuratorWatcher listener) {
+    public List<String> addTargetChildListener(String path, CuratorWatcherImpl listener) {
         try {
             return client.getChildren().usingWatcher(listener).forPath(path);
         } catch (NoNodeException e) {
@@ -188,27 +208,73 @@ public class CuratorZookeeperClient extends AbstractZookeeperClient<CuratorWatch
     }
 
     @Override
-    public void removeTargetChildListener(String path, CuratorWatcher listener) {
-        ((CuratorWatcherImpl) listener).unwatch();
+    protected CuratorZookeeperClient.CuratorWatcherImpl createTargetDataListener(String path, DataListener listener) {
+        return new CuratorWatcherImpl(client, listener);
+    }
+
+    @Override
+    protected void addTargetDataListener(String path, CuratorZookeeperClient.CuratorWatcherImpl treeCacheListener) {
+        this.addTargetDataListener(path, treeCacheListener, null);
+    }
+
+    @Override
+    protected void addTargetDataListener(String path, CuratorZookeeperClient.CuratorWatcherImpl treeCacheListener, Executor executor) {
+        try {
+            TreeCache treeCache = TreeCache.newBuilder(client, path).setCacheData(false).build();
+            treeCacheMap.putIfAbsent(path, treeCache);
+            treeCache.start();
+            if (executor == null) {
+                treeCache.getListenable().addListener(treeCacheListener);
+            } else {
+                treeCache.getListenable().addListener(treeCacheListener, executor);
+            }
+        } catch (Exception e) {
+            throw new IllegalStateException("Add treeCache listener for path:" + path, e);
+        }
+    }
+
+    @Override
+    protected void removeTargetDataListener(String path, CuratorZookeeperClient.CuratorWatcherImpl treeCacheListener) {
+        TreeCache treeCache = treeCacheMap.get(path);
+        if (treeCache != null) {
+            treeCache.getListenable().removeListener(treeCacheListener);
+        }
+        treeCacheListener.dataListener = null;
     }
 
-    private class CuratorWatcherImpl implements CuratorWatcher {
+    @Override
+    public void removeTargetChildListener(String path, CuratorWatcherImpl listener) {
+        listener.unwatch();
+    }
+
+    static class CuratorWatcherImpl implements CuratorWatcher, TreeCacheListener {
+
+        private CuratorFramework client;
+        private volatile ChildListener childListener;
+        private volatile DataListener dataListener;
 
-        private volatile ChildListener listener;
 
-        public CuratorWatcherImpl(ChildListener listener) {
-            this.listener = listener;
+        public CuratorWatcherImpl(CuratorFramework client, ChildListener listener) {
+            this.client = client;
+            this.childListener = listener;
+        }
+
+        public CuratorWatcherImpl(CuratorFramework client, DataListener dataListener) {
+            this.dataListener = dataListener;
+        }
+
+        protected CuratorWatcherImpl() {
         }
 
         public void unwatch() {
-            this.listener = null;
+            this.childListener = null;
         }
 
         @Override
         public void process(WatchedEvent event) throws Exception {
-            if (listener != null) {
+            if (childListener != null) {
                 String path = event.getPath() == null ? "" : event.getPath();
-                listener.childChanged(path,
+                childListener.childChanged(path,
                         // if path is null, curator using watcher will throw NullPointerException.
                         // if client connect or disconnect to server, zookeeper will queue
                         // watched event(Watcher.Event.EventType.None, .., path = null).
@@ -217,6 +283,54 @@ public class CuratorZookeeperClient extends AbstractZookeeperClient<CuratorWatch
                                 : Collections.<String>emptyList());
             }
         }
+
+        @Override
+        public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
+            if (dataListener != null) {
+                TreeCacheEvent.Type type = event.getType();
+                EventType eventType = null;
+                String content = null;
+                String path = null;
+                switch (type) {
+                    case NODE_ADDED:
+                        eventType = EventType.NodeCreated;
+                        path = event.getData().getPath();
+                        content = new String(event.getData().getData(), charset);
+                        break;
+                    case NODE_UPDATED:
+                        eventType = EventType.NodeDataChanged;
+                        path = event.getData().getPath();
+                        content = new String(event.getData().getData(), charset);
+                        break;
+                    case NODE_REMOVED:
+                        path = event.getData().getPath();
+                        eventType = EventType.NodeDeleted;
+                        break;
+                    case INITIALIZED:
+                        eventType = EventType.INITIALIZED;
+                        break;
+                    case CONNECTION_LOST:
+                        eventType = EventType.CONNECTION_LOST;
+                        break;
+                    case CONNECTION_RECONNECTED:
+                        eventType = EventType.CONNECTION_RECONNECTED;
+                        break;
+                    case CONNECTION_SUSPENDED:
+                        eventType = EventType.CONNECTION_SUSPENDED;
+                        break;
+
+                }
+                dataListener.dataChanged(path, content, eventType);
+            }
+        }
     }
 
+    /**
+     * just for unit test
+     *
+     * @return
+     */
+    CuratorFramework getClient() {
+        return client;
+    }
 }
diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/support/AbstractZookeeperClient.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/support/AbstractZookeeperClient.java
index e90f7fb..9697cea 100644
--- a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/support/AbstractZookeeperClient.java
+++ b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/support/AbstractZookeeperClient.java
@@ -20,6 +20,7 @@ import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
 import org.apache.dubbo.remoting.zookeeper.ChildListener;
+import org.apache.dubbo.remoting.zookeeper.DataListener;
 import org.apache.dubbo.remoting.zookeeper.StateListener;
 import org.apache.dubbo.remoting.zookeeper.ZookeeperClient;
 
@@ -28,8 +29,9 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.Executor;
 
-public abstract class AbstractZookeeperClient<TargetChildListener> implements ZookeeperClient {
+public abstract class AbstractZookeeperClient<TargetDataListener, TargetChildListener> implements ZookeeperClient {
 
     protected static final Logger logger = LoggerFactory.getLogger(AbstractZookeeperClient.class);
 
@@ -39,6 +41,8 @@ public abstract class AbstractZookeeperClient<TargetChildListener> implements Zo
 
     private final ConcurrentMap<String, ConcurrentMap<ChildListener, TargetChildListener>> childListeners = new ConcurrentHashMap<String, ConcurrentMap<ChildListener, TargetChildListener>>();
 
+    private final ConcurrentMap<String, ConcurrentMap<DataListener, TargetDataListener>> listeners = new ConcurrentHashMap<String, ConcurrentMap<DataListener, TargetDataListener>>();
+
     private volatile boolean closed = false;
 
     public AbstractZookeeperClient(URL url) {
@@ -98,6 +102,37 @@ public abstract class AbstractZookeeperClient<TargetChildListener> implements Zo
     }
 
     @Override
+    public void addDataListener(String path, DataListener listener) {
+        this.addDataListener(path, listener, null);
+    }
+
+    @Override
+    public void addDataListener(String path, DataListener listener, Executor executor) {
+        ConcurrentMap<DataListener, TargetDataListener> dataListenerMap = listeners.get(path);
+        if (dataListenerMap == null) {
+            listeners.putIfAbsent(path, new ConcurrentHashMap<DataListener, TargetDataListener>());
+            dataListenerMap = listeners.get(path);
+        }
+        TargetDataListener targetListener = dataListenerMap.get(listener);
+        if (targetListener == null) {
+            dataListenerMap.putIfAbsent(listener, createTargetDataListener(path, listener));
+            targetListener = dataListenerMap.get(listener);
+        }
+        addTargetDataListener(path, targetListener, executor);
+    }
+
+    @Override
+    public void removeDataListener(String path, DataListener listener ){
+        ConcurrentMap<DataListener, TargetDataListener> dataListenerMap = listeners.get(path);
+        if (dataListenerMap != null) {
+            TargetDataListener targetListener = dataListenerMap.remove(listener);
+            if(targetListener != null){
+                removeTargetDataListener(path, targetListener);
+            }
+        }
+    }
+
+    @Override
     public void removeChildListener(String path, ChildListener listener) {
         ConcurrentMap<ChildListener, TargetChildListener> listeners = childListeners.get(path);
         if (listeners != null) {
@@ -167,6 +202,14 @@ public abstract class AbstractZookeeperClient<TargetChildListener> implements Zo
 
     protected abstract List<String> addTargetChildListener(String path, TargetChildListener listener);
 
+    protected abstract TargetDataListener createTargetDataListener(String path, DataListener listener);
+
+    protected abstract void addTargetDataListener(String path, TargetDataListener listener);
+
+    protected abstract void addTargetDataListener(String path, TargetDataListener listener, Executor executor);
+
+    protected abstract void removeTargetDataListener(String path, TargetDataListener listener);
+
     protected abstract void removeTargetChildListener(String path, TargetChildListener listener);
 
     protected abstract String doGetContent(String path);
diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkClientWrapper.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkClientWrapper.java
deleted file mode 100644
index ae8a3ef..0000000
--- a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkClientWrapper.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dubbo.remoting.zookeeper.zkclient;
-
-import org.I0Itec.zkclient.IZkChildListener;
-import org.I0Itec.zkclient.IZkStateListener;
-import org.I0Itec.zkclient.ZkClient;
-import org.apache.dubbo.common.logger.Logger;
-import org.apache.dubbo.common.logger.LoggerFactory;
-import org.apache.dubbo.common.utils.Assert;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
-
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Zkclient wrapper class that can monitor the state of the connection automatically after the connection is out of time
- * It is also consistent with the use of curator
- *
- * @date 2017/10/29
- */
-public class ZkClientWrapper {
-    private Logger logger = LoggerFactory.getLogger(ZkClientWrapper.class);
-    private long timeout;
-    private ZkClient client;
-    private volatile KeeperState state;
-    private CompletableFuture<ZkClient> completableFuture;
-    private volatile boolean started = false;
-
-    public ZkClientWrapper(final String serverAddr, long timeout) {
-        this.timeout = timeout;
-        completableFuture = CompletableFuture.supplyAsync(() -> new ZkClient(serverAddr, Integer.MAX_VALUE));
-    }
-
-    public void start() {
-        if (!started) {
-            try {
-                client = completableFuture.get(timeout, TimeUnit.MILLISECONDS);
-//                this.client.subscribeStateChanges(IZkStateListener);
-            } catch (Throwable t) {
-                logger.error("Timeout! zookeeper server can not be connected in : " + timeout + "ms!", t);
-                completableFuture.whenComplete(this::makeClientReady);
-            }
-            started = true;
-        } else {
-            logger.warn("Zkclient has already been started!");
-        }
-    }
-
-    public void addListener(IZkStateListener listener) {
-        completableFuture.whenComplete((value, exception) -> {
-            this.makeClientReady(value, exception);
-            if (exception == null) {
-                client.subscribeStateChanges(listener);
-            }
-        });
-    }
-
-    public boolean isConnected() {
-//        return client != null && state == KeeperState.SyncConnected;
-        return client != null;
-    }
-
-    public void createPersistent(String path) {
-        Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!"));
-        client.createPersistent(path, true);
-    }
-
-    public void createEphemeral(String path) {
-        Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!"));
-        client.createEphemeral(path);
-    }
-
-    public void createPersistent(String path, String data) {
-        Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!"));
-        client.createPersistent(path, data);
-    }
-
-    public void createEphemeral(String path, String data) {
-        Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!"));
-        client.createEphemeral(path, data);
-    }
-
-    public void delete(String path) {
-        Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!"));
-        client.delete(path);
-    }
-
-    public List<String> getChildren(String path) {
-        Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!"));
-        return client.getChildren(path);
-    }
-
-    public String getData(String path) {
-        Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!"));
-        return client.readData(path);
-    }
-
-    public boolean exists(String path) {
-        Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!"));
-        return client.exists(path);
-    }
-
-    public void close() {
-        Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!"));
-        client.close();
-    }
-
-    public List<String> subscribeChildChanges(String path, final IZkChildListener listener) {
-        Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!"));
-        return client.subscribeChildChanges(path, listener);
-    }
-
-    public void unsubscribeChildChanges(String path, IZkChildListener listener) {
-        Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!"));
-        client.unsubscribeChildChanges(path, listener);
-    }
-
-    private void makeClientReady(ZkClient client, Throwable e) {
-        if (e != null) {
-            logger.error("Got an exception when trying to create zkclient instance, can not connect to zookeeper server, please check!", e);
-        } else {
-            this.client = client;
-//            this.client.subscribeStateChanges(IZkStateListener);
-        }
-    }
-
-
-}
diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkclientZookeeperClient.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkclientZookeeperClient.java
deleted file mode 100644
index c366400..0000000
--- a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkclientZookeeperClient.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dubbo.remoting.zookeeper.zkclient;
-
-
-import org.I0Itec.zkclient.IZkChildListener;
-import org.I0Itec.zkclient.IZkStateListener;
-import org.I0Itec.zkclient.exception.ZkNoNodeException;
-import org.I0Itec.zkclient.exception.ZkNodeExistsException;
-import org.apache.dubbo.common.Constants;
-import org.apache.dubbo.common.URL;
-import org.apache.dubbo.common.logger.Logger;
-import org.apache.dubbo.common.logger.LoggerFactory;
-import org.apache.dubbo.remoting.zookeeper.ChildListener;
-import org.apache.dubbo.remoting.zookeeper.StateListener;
-import org.apache.dubbo.remoting.zookeeper.support.AbstractZookeeperClient;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
-
-import java.util.List;
-
-public class ZkclientZookeeperClient extends AbstractZookeeperClient<IZkChildListener> {
-
-    private Logger logger = LoggerFactory.getLogger(ZkclientZookeeperClient.class);
-
-    private final ZkClientWrapper client;
-
-    private volatile KeeperState state = KeeperState.SyncConnected;
-
-    public ZkclientZookeeperClient(URL url) {
-        super(url);
-        long timeout = url.getParameter(Constants.TIMEOUT_KEY, 30000L);
-        client = new ZkClientWrapper(url.getBackupAddress(), timeout);
-        client.addListener(new IZkStateListener() {
-            @Override
-            public void handleStateChanged(KeeperState state) throws Exception {
-                ZkclientZookeeperClient.this.state = state;
-                if (state == KeeperState.Disconnected) {
-                    stateChanged(StateListener.DISCONNECTED);
-                } else if (state == KeeperState.SyncConnected) {
-                    stateChanged(StateListener.CONNECTED);
-                }
-            }
-
-            @Override
-            public void handleNewSession() throws Exception {
-                stateChanged(StateListener.RECONNECTED);
-            }
-        });
-        client.start();
-    }
-
-    @Override
-    public void createPersistent(String path) {
-        try {
-            client.createPersistent(path);
-        } catch (ZkNodeExistsException e) {
-            logger.error("zookeeper failed to create persistent node with " + path + ": ", e);
-        }
-    }
-
-    @Override
-    public void createEphemeral(String path) {
-        try {
-            client.createEphemeral(path);
-        } catch (ZkNodeExistsException e) {
-            logger.error("zookeeper failed to create ephemeral node with " + path + ": ", e);
-        }
-    }
-
-    @Override
-    protected void createPersistent(String path, String data) {
-        try {
-            client.createPersistent(path, data);
-        } catch (ZkNodeExistsException e) {
-            logger.error("zookeeper failed to create persistent node with " +
-                    path + " and " + data + " : ", e);
-        }
-    }
-
-    @Override
-    protected void createEphemeral(String path, String data) {
-        try {
-            client.createEphemeral(path, data);
-        } catch (ZkNodeExistsException e) {
-            logger.error("zookeeper failed to create ephemeral node with " +
-                    path + " and " + data + " : ", e);
-        }
-    }
-
-    @Override
-    public void delete(String path) {
-        try {
-            client.delete(path);
-        } catch (ZkNoNodeException e) {
-            logger.error("zookeeper failed to delete node with " + path + ": ", e);
-        }
-    }
-
-    @Override
-    public List<String> getChildren(String path) {
-        try {
-            return client.getChildren(path);
-        } catch (ZkNoNodeException e) {
-            logger.error("zookeeper failed to get children node with " + path + ": ", e);
-            return null;
-        }
-    }
-
-    @Override
-    public boolean checkExists(String path) {
-        try {
-            return client.exists(path);
-        } catch (Throwable t) {
-            logger.error("zookeeper failed to check node existing with " + path + ": ", t);
-        }
-        return false;
-    }
-
-    @Override
-    public boolean isConnected() {
-        return state == KeeperState.SyncConnected;
-    }
-
-    @Override
-    public String doGetContent(String path) {
-        try {
-            return client.getData(path);
-        } catch (ZkNoNodeException e) {
-            logger.error("zookeeper failed to get data with " + path + ": ", e);
-            return null;
-        }
-    }
-
-    @Override
-    public void doClose() {
-        client.close();
-    }
-
-    @Override
-    public IZkChildListener createTargetChildListener(String path, final ChildListener listener) {
-        return listener::childChanged;
-    }
-
-    @Override
-    public List<String> addTargetChildListener(String path, final IZkChildListener listener) {
-        return client.subscribeChildChanges(path, listener);
-    }
-
-    @Override
-    public void removeTargetChildListener(String path, IZkChildListener listener) {
-        client.unsubscribeChildChanges(path, listener);
-    }
-
-}
diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.zookeeper.ZookeeperTransporter b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.zookeeper.ZookeeperTransporter
index e9b9349..f8cbd5b 100644
--- a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.zookeeper.ZookeeperTransporter
+++ b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.zookeeper.ZookeeperTransporter
@@ -1,2 +1 @@
-zkclient=org.apache.dubbo.remoting.zookeeper.zkclient.ZkclientZookeeperTransporter
-curator=org.apache.dubbo.remoting.zookeeper.curator.CuratorZookeeperTransporter
\ No newline at end of file
+curator=org.apache.dubbo.remoting.zookeeper.curator.CuratorZookeeperTransporter
diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClientTest.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClientTest.java
index cb89b16..f1882e1 100644
--- a/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClientTest.java
+++ b/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClientTest.java
@@ -20,7 +20,11 @@ import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.utils.NetUtils;
 import org.apache.dubbo.remoting.zookeeper.ChildListener;
 
-import org.apache.curator.framework.api.CuratorWatcher;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
+import org.apache.curator.framework.recipes.cache.TreeCacheListener;
+import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.curator.test.TestingServer;
 import org.apache.zookeeper.WatchedEvent;
 import org.junit.jupiter.api.AfterEach;
@@ -31,6 +35,7 @@ import org.junit.jupiter.api.Test;
 import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.Is.is;
@@ -41,6 +46,7 @@ import static org.mockito.Mockito.mock;
 public class CuratorZookeeperClientTest {
     private TestingServer zkServer;
     private CuratorZookeeperClient curatorClient;
+    CuratorFramework client = null;
 
     @BeforeEach
     public void setUp() throws Exception {
@@ -48,6 +54,8 @@ public class CuratorZookeeperClientTest {
         zkServer = new TestingServer(zkServerPort, true);
         curatorClient = new CuratorZookeeperClient(URL.valueOf("zookeeper://127.0.0.1:" +
                 zkServerPort + "/org.apache.dubbo.registry.RegistryService"));
+        client = CuratorFrameworkFactory.newClient(zkServer.getConnectString(), new ExponentialBackoffRetry(1000, 3));
+        client.start();
     }
 
     @Test
@@ -74,7 +82,8 @@ public class CuratorZookeeperClientTest {
         String path = "/dubbo/org.apache.dubbo.demo.DemoService/providers";
         curatorClient.create(path, false);
         final CountDownLatch countDownLatch = new CountDownLatch(1);
-        curatorClient.addTargetChildListener(path, new CuratorWatcher() {
+        curatorClient.addTargetChildListener(path, new CuratorZookeeperClient.CuratorWatcherImpl() {
+
             @Override
             public void process(WatchedEvent watchedEvent) throws Exception {
                 countDownLatch.countDown();
@@ -153,4 +162,35 @@ public class CuratorZookeeperClientTest {
         curatorClient.close();
         zkServer.stop();
     }
+
+    @Test
+    public void testAddTargetDataListener() throws Exception {
+        String listenerPath = "/dubbo/service.name/configuration";
+        String path = listenerPath + "/dat/data";
+        String value = "vav";
+
+        curatorClient.create(path + "/d.json", value, true);
+        String valueFromCache = curatorClient.getContent(path + "/d.json");
+        Assertions.assertEquals(value, valueFromCache);
+        final AtomicInteger atomicInteger = new AtomicInteger(0);
+        curatorClient.addTargetDataListener(listenerPath, new CuratorZookeeperClient.CuratorWatcherImpl() {
+            @Override
+            public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
+                System.out.println("===" + event);
+                atomicInteger.incrementAndGet();
+            }
+        });
+
+        valueFromCache = curatorClient.getContent(path + "/d.json");
+        Assertions.assertNotNull(valueFromCache);
+        curatorClient.getClient().setData().forPath(path + "/d.json", "sdsdf".getBytes());
+        curatorClient.getClient().setData().forPath(path + "/d.json", "dfsasf".getBytes());
+        curatorClient.delete(path + "/d.json");
+        curatorClient.delete(path);
+        valueFromCache = curatorClient.getContent(path + "/d.json");
+        Assertions.assertNull(valueFromCache);
+        Thread.sleep(2000l);
+        Assertions.assertTrue(9l >= atomicInteger.get());
+        Assertions.assertTrue(2l <= atomicInteger.get());
+    }
 }
diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkClientWrapperTest.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkClientWrapperTest.java
deleted file mode 100644
index 629c0e9..0000000
--- a/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkClientWrapperTest.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dubbo.remoting.zookeeper.zkclient;
-
-import org.apache.dubbo.common.utils.NetUtils;
-import org.I0Itec.zkclient.IZkChildListener;
-import org.apache.curator.test.TestingServer;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.mockito.Mockito.mock;
-
-public class ZkClientWrapperTest {
-    private TestingServer zkServer;
-    private ZkClientWrapper zkClientWrapper;
-
-    @BeforeEach
-    public void setUp() throws Exception {
-        int zkServerPort = NetUtils.getAvailablePort();
-        zkServer = new TestingServer(zkServerPort, true);
-        zkClientWrapper = new ZkClientWrapper("127.0.0.1:" + zkServerPort, 10000);
-    }
-
-    @AfterEach
-    public void tearDown() throws Exception {
-        zkServer.stop();
-    }
-
-    @Test
-    public void testConnectedStatus() {
-        boolean connected = zkClientWrapper.isConnected();
-        assertThat(connected, is(false));
-        zkClientWrapper.start();
-
-        IZkChildListener listener = mock(IZkChildListener.class);
-        zkClientWrapper.subscribeChildChanges("/path", listener);
-        zkClientWrapper.unsubscribeChildChanges("/path", listener);
-    }
-}
\ No newline at end of file
diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkclientZookeeperClientTest.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkclientZookeeperClientTest.java
deleted file mode 100644
index 73c402a..0000000
--- a/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkclientZookeeperClientTest.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dubbo.remoting.zookeeper.zkclient;
-
-import org.apache.dubbo.common.URL;
-import org.apache.dubbo.common.utils.NetUtils;
-import org.apache.dubbo.remoting.zookeeper.StateListener;
-import org.I0Itec.zkclient.IZkChildListener;
-import org.apache.curator.test.TestingServer;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-
-import static org.hamcrest.Matchers.hasSize;
-import static org.hamcrest.core.Is.is;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.hamcrest.MatcherAssert.assertThat;
-
-public class ZkclientZookeeperClientTest {
-    private TestingServer zkServer;
-    private ZkclientZookeeperClient zkclientZookeeperClient;
-
-    @BeforeEach
-    public void setUp() throws Exception {
-        int zkServerPort = NetUtils.getAvailablePort();
-        zkServer = new TestingServer(zkServerPort, true);
-        zkclientZookeeperClient = new ZkclientZookeeperClient(URL.valueOf("zookeeper://127.0.0.1:" +
-                zkServerPort + "/org.apache.dubbo.registry.RegistryService"));
-    }
-
-    @Test
-    public void testCheckExists() {
-        String path = "/dubbo/org.apache.dubbo.demo.DemoService/providers";
-        zkclientZookeeperClient.create(path, false);
-        assertThat(zkclientZookeeperClient.checkExists(path), is(true));
-        assertThat(zkclientZookeeperClient.checkExists(path + "/noneexits"), is(false));
-    }
-
-    @Test
-    public void testDeletePath() {
-        String path = "/dubbo/org.apache.dubbo.demo.DemoService/providers";
-        zkclientZookeeperClient.create(path, false);
-        assertThat(zkclientZookeeperClient.checkExists(path), is(true));
-
-        zkclientZookeeperClient.delete(path);
-        assertThat(zkclientZookeeperClient.checkExists(path), is(false));
-    }
-
-    @Test
-    public void testConnectState() throws Exception {
-        assertThat(zkclientZookeeperClient.isConnected(), is(true));
-        final CountDownLatch stopLatch = new CountDownLatch(1);
-        zkclientZookeeperClient.addStateListener(new StateListener() {
-            @Override
-            public void stateChanged(int connected) {
-                stopLatch.countDown();
-            }
-        });
-        zkServer.stop();
-        stopLatch.await();
-        assertThat(zkclientZookeeperClient.isConnected(), is(false));
-    }
-
-    @Test
-    public void testChildrenListener() throws InterruptedException {
-        String path = "/dubbo/org.apache.dubbo.demo.DemoService/providers";
-        zkclientZookeeperClient.create(path, false);
-        final CountDownLatch countDownLatch = new CountDownLatch(1);
-        zkclientZookeeperClient.addTargetChildListener(path, new IZkChildListener() {
-            @Override
-            public void handleChildChange(String s, List<String> list) throws Exception {
-                countDownLatch.countDown();
-            }
-        });
-        zkclientZookeeperClient.createPersistent(path + "/provider1");
-        countDownLatch.await();
-    }
-
-    @Test
-    public void testGetChildren() throws IOException {
-        String path = "/dubbo/org.apache.dubbo.demo.DemoService/parentProviders";
-        zkclientZookeeperClient.create(path, false);
-        for (int i = 0; i < 5; i++) {
-            zkclientZookeeperClient.createEphemeral(path + "/server" + i);
-        }
-        List<String> zookeeperClientChildren = zkclientZookeeperClient.getChildren(path);
-        assertThat(zookeeperClientChildren, hasSize(5));
-    }
-
-    @Test
-    public void testCreateContentPersistent() {
-        String path = "/ZkclientZookeeperClient/content.data";
-        String content = "createContentTest";
-        zkclientZookeeperClient.delete(path);
-        assertThat(zkclientZookeeperClient.checkExists(path), is(false));
-        assertNull(zkclientZookeeperClient.getContent(path));
-
-        zkclientZookeeperClient.create(path, content, false);
-        assertThat(zkclientZookeeperClient.checkExists(path), is(true));
-        assertEquals(zkclientZookeeperClient.getContent(path), content);
-    }
-
-    @Test
-    public void testCreateContentTem() {
-        String path = "/ZkclientZookeeperClient/content.data";
-        String content = "createContentTest";
-        zkclientZookeeperClient.delete(path);
-        assertThat(zkclientZookeeperClient.checkExists(path), is(false));
-        assertNull(zkclientZookeeperClient.getContent(path));
-
-        zkclientZookeeperClient.create(path, content, true);
-        assertThat(zkclientZookeeperClient.checkExists(path), is(true));
-        assertEquals(zkclientZookeeperClient.getContent(path), content);
-    }
-
-    @AfterEach
-    public void tearDown() throws Exception {
-        zkclientZookeeperClient.close();
-        zkServer.stop();
-    }
-}
diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkclientZookeeperTransporterTest.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkclientZookeeperTransporterTest.java
deleted file mode 100644
index cbadda9..0000000
--- a/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkclientZookeeperTransporterTest.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dubbo.remoting.zookeeper.zkclient;
-
-import org.apache.dubbo.common.URL;
-import org.apache.dubbo.common.utils.NetUtils;
-import org.apache.dubbo.remoting.zookeeper.ZookeeperClient;
-import org.apache.curator.test.TestingServer;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.core.IsNot.not;
-import static org.hamcrest.core.IsNull.nullValue;
-
-public class ZkclientZookeeperTransporterTest {
-    private TestingServer zkServer;
-    private ZookeeperClient zookeeperClient;
-
-    @BeforeEach
-    public void setUp() throws Exception {
-        int zkServerPort = NetUtils.getAvailablePort();
-        zkServer = new TestingServer(zkServerPort, true);
-        zookeeperClient = new ZkclientZookeeperTransporter().connect(URL.valueOf("zookeeper://127.0.0.1:" +
-                zkServerPort + "/service"));
-    }
-
-    @Test
-    public void testZookeeperClient() {
-        assertThat(zookeeperClient, not(nullValue()));
-        zookeeperClient.close();
-    }
-
-    @AfterEach
-    public void tearDown() throws Exception {
-        zkServer.stop();
-    }
-}
\ No newline at end of file