You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by al...@apache.org on 2021/06/03 09:24:26 UTC
[dubbo] branch 3.0 updated: Use nodeCache to take place of
treeCache (#7953)
This is an automated email from the ASF dual-hosted git repository.
albumenj pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.0 by this push:
new d5b46c3 Use nodeCache to take place of treeCache (#7953)
d5b46c3 is described below
commit d5b46c3e6561b1c42d8f146f03e388342d9d787d
Author: 赵延 <10...@qq.com>
AuthorDate: Thu Jun 3 17:23:30 2021 +0800
Use nodeCache to take place of treeCache (#7953)
---
.../support/zookeeper/CacheListener.java | 60 +++-------
.../zookeeper/ZookeeperDynamicConfiguration.java | 26 ++--
.../ZookeeperDynamicConfigurationTest.java | 11 +-
.../zookeeper/curator/CuratorZookeeperClient.java | 131 +++++++++------------
.../curator/CuratorZookeeperClientTest.java | 18 +--
5 files changed, 102 insertions(+), 144 deletions(-)
diff --git a/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/CacheListener.java b/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/CacheListener.java
index 5a10be7..66bd0e9 100644
--- a/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/CacheListener.java
+++ b/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/CacheListener.java
@@ -28,7 +28,6 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.CountDownLatch;
import static org.apache.dubbo.common.constants.CommonConstants.DOT_SEPARATOR;
import static org.apache.dubbo.common.constants.CommonConstants.PATH_SEPARATOR;
@@ -38,29 +37,30 @@ import static org.apache.dubbo.common.constants.CommonConstants.PATH_SEPARATOR;
*/
public class CacheListener implements DataListener {
- private static final int MIN_PATH_DEPTH = 5;
private Map<String, Set<ConfigurationListener>> keyListeners = new ConcurrentHashMap<>();
- private CountDownLatch initializedLatch;
private String rootPath;
- public CacheListener(String rootPath, CountDownLatch initializedLatch) {
+ public CacheListener(String rootPath) {
this.rootPath = rootPath;
- this.initializedLatch = initializedLatch;
}
public void addListener(String key, ConfigurationListener configurationListener) {
- Set<ConfigurationListener> listeners = this.keyListeners.computeIfAbsent(key, k -> new CopyOnWriteArraySet<>());
+ Set<ConfigurationListener> listeners = keyListeners.computeIfAbsent(key, k -> new CopyOnWriteArraySet<>());
listeners.add(configurationListener);
}
public void removeListener(String key, ConfigurationListener configurationListener) {
- Set<ConfigurationListener> listeners = this.keyListeners.get(key);
+ Set<ConfigurationListener> listeners = keyListeners.get(key);
if (listeners != null) {
listeners.remove(configurationListener);
}
}
+ public Set<ConfigurationListener> getConfigurationListeners(String key) {
+ return keyListeners.get(key);
+ }
+
/**
* This is used to convert a configuration nodePath into a key
* TODO doc
@@ -93,43 +93,19 @@ public class CacheListener implements DataListener {
@Override
public void dataChanged(String path, Object value, EventType eventType) {
- if (eventType == null) {
- return;
- }
-
- if (eventType == EventType.INITIALIZED) {
- initializedLatch.countDown();
- return;
- }
-
- if (path == null || (value == null && eventType != EventType.NodeDeleted)) {
- return;
+ ConfigChangeType changeType;
+ if (value == null) {
+ changeType = ConfigChangeType.DELETED;
+ } else {
+ changeType = ConfigChangeType.MODIFIED;
}
+ String key = pathToKey(path);
- // TODO We only care the changes happened on a specific path level, for example
- // /dubbo/config/dubbo/configurators, other config changes not in this level will be ignored,
- if (path.split("/").length >= MIN_PATH_DEPTH) {
- String key = pathToKey(path);
- ConfigChangeType changeType;
- switch (eventType) {
- case NodeCreated:
- changeType = ConfigChangeType.ADDED;
- break;
- case NodeDeleted:
- changeType = ConfigChangeType.DELETED;
- break;
- case NodeDataChanged:
- changeType = ConfigChangeType.MODIFIED;
- break;
- default:
- return;
- }
-
- ConfigChangedEvent configChangeEvent = new ConfigChangedEvent(key, getGroup(path), (String) value, changeType);
- Set<ConfigurationListener> listeners = keyListeners.get(path);
- if (CollectionUtils.isNotEmpty(listeners)) {
- listeners.forEach(listener -> listener.process(configChangeEvent));
- }
+ ConfigChangedEvent configChangeEvent = new ConfigChangedEvent(key, getGroup(path), (String) value, changeType);
+ Set<ConfigurationListener> listeners = keyListeners.get(path);
+ if (CollectionUtils.isNotEmpty(listeners)) {
+ listeners.forEach(listener -> listener.process(configChangeEvent));
}
}
}
+
diff --git a/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfiguration.java b/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfiguration.java
index 3ce72c0..6787a8c 100644
--- a/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfiguration.java
+++ b/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfiguration.java
@@ -21,12 +21,13 @@ import org.apache.dubbo.common.config.configcenter.ConfigItem;
import org.apache.dubbo.common.config.configcenter.ConfigurationListener;
import org.apache.dubbo.common.config.configcenter.TreePathDynamicConfiguration;
import org.apache.dubbo.common.threadpool.support.AbortPolicyWithReport;
+import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.apache.dubbo.remoting.zookeeper.ZookeeperClient;
import org.apache.dubbo.remoting.zookeeper.ZookeeperTransporter;
import java.util.Collection;
-import java.util.concurrent.CountDownLatch;
+import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
@@ -41,7 +42,6 @@ public class ZookeeperDynamicConfiguration extends TreePathDynamicConfiguration
// The final root path would be: /configRootPath/"config"
private String rootPath;
private final ZookeeperClient zkClient;
- private CountDownLatch initializedLatch;
private CacheListener cacheListener;
private URL url;
@@ -55,8 +55,7 @@ public class ZookeeperDynamicConfiguration extends TreePathDynamicConfiguration
this.url = url;
rootPath = getRootPath(url);
- initializedLatch = new CountDownLatch(1);
- this.cacheListener = new CacheListener(rootPath, initializedLatch);
+ this.cacheListener = new CacheListener(rootPath);
final String threadName = this.getClass().getSimpleName();
this.executor = new ThreadPoolExecutor(DEFAULT_ZK_EXECUTOR_THREADS_NUM, DEFAULT_ZK_EXECUTOR_THREADS_NUM,
@@ -66,17 +65,9 @@ public class ZookeeperDynamicConfiguration extends TreePathDynamicConfiguration
new AbortPolicyWithReport(threadName, url));
zkClient = zookeeperTransporter.connect(url);
- zkClient.addDataListener(rootPath, cacheListener, executor);
- try {
- // Wait for connection
- long timeout = url.getParameter("init.timeout", 5000);
- boolean isCountDown = this.initializedLatch.await(timeout, TimeUnit.MILLISECONDS);
- if (!isCountDown) {
- throw new IllegalStateException("Failed to receive INITIALIZED event from zookeeper, pls. check if url "
- + url + " is correct");
- }
- } catch (InterruptedException e) {
- logger.warn("Failed to build local cache for config center (zookeeper)." + url);
+ boolean isConnected = zkClient.isConnected();
+ if (!isConnected) {
+ throw new IllegalStateException("Failed to connect with zookeeper, pls check if url " + url + " is correct.");
}
}
@@ -132,11 +123,16 @@ public class ZookeeperDynamicConfiguration extends TreePathDynamicConfiguration
@Override
protected void doAddListener(String pathKey, ConfigurationListener listener) {
cacheListener.addListener(pathKey, listener);
+ zkClient.addDataListener(pathKey, cacheListener, executor);
}
@Override
protected void doRemoveListener(String pathKey, ConfigurationListener listener) {
cacheListener.removeListener(pathKey, listener);
+ Set<ConfigurationListener> configurationListeners = cacheListener.getConfigurationListeners(pathKey);
+ if (CollectionUtils.isNotEmpty(configurationListeners)) {
+ zkClient.removeDataListener(pathKey, cacheListener);
+ }
}
@Override
diff --git a/dubbo-configcenter/dubbo-configcenter-zookeeper/src/test/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfigurationTest.java b/dubbo-configcenter/dubbo-configcenter-zookeeper/src/test/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfigurationTest.java
index e40e231..85eba38 100644
--- a/dubbo-configcenter/dubbo-configcenter-zookeeper/src/test/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfigurationTest.java
+++ b/dubbo-configcenter/dubbo-configcenter-zookeeper/src/test/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfigurationTest.java
@@ -66,7 +66,7 @@ public class ZookeeperDynamicConfigurationTest {
try {
setData("/dubbo/config/dubbo/dubbo.properties", "The content from dubbo.properties");
setData("/dubbo/config/dubbo/service:version:group.configurators", "The content from configurators");
- setData("/dubbo/config/appname", "The content from higer level node");
+ setData("/dubbo/config/appname", "The content from higher level node");
setData("/dubbo/config/dubbo/appname.tag-router", "The content from appname tagrouters");
setData("/dubbo/config/dubbo/never.change.DemoService.configurators", "Never change value from configurators");
} catch (Exception e) {
@@ -109,6 +109,7 @@ public class ZookeeperDynamicConfigurationTest {
configuration.addListener("appname.tag-router", listener3);
configuration.addListener("appname.tag-router", listener4);
+ Thread.sleep(100);
setData("/dubbo/config/dubbo/service:version:group.configurators", "new value1");
Thread.sleep(100);
setData("/dubbo/config/dubbo/appname.tag-router", "new value2");
@@ -118,10 +119,10 @@ public class ZookeeperDynamicConfigurationTest {
Thread.sleep(5000);
latch.await();
- Assertions.assertEquals(1, listener1.getCount("service:version:group.configurators"));
- Assertions.assertEquals(1, listener2.getCount("service:version:group.configurators"));
- Assertions.assertEquals(1, listener3.getCount("appname.tag-router"));
- Assertions.assertEquals(1, listener4.getCount("appname.tag-router"));
+ Assertions.assertEquals(2, listener1.getCount("service:version:group.configurators"));
+ Assertions.assertEquals(2, listener2.getCount("service:version:group.configurators"));
+ Assertions.assertEquals(2, listener3.getCount("appname.tag-router"));
+ Assertions.assertEquals(2, listener4.getCount("appname.tag-router"));
Assertions.assertEquals("new value1", listener1.getValue());
Assertions.assertEquals("new value1", listener2.getValue());
diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClient.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClient.java
index 193943d..559feef 100644
--- a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClient.java
+++ b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClient.java
@@ -29,9 +29,9 @@ import org.apache.dubbo.remoting.zookeeper.support.AbstractZookeeperClient;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorWatcher;
-import org.apache.curator.framework.recipes.cache.TreeCache;
-import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
-import org.apache.curator.framework.recipes.cache.TreeCacheListener;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.NodeCache;
+import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.RetryNTimes;
@@ -52,15 +52,15 @@ import java.util.concurrent.TimeUnit;
import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
-public class CuratorZookeeperClient
- extends AbstractZookeeperClient<CuratorZookeeperClient.CuratorWatcherImpl, CuratorZookeeperClient.CuratorWatcherImpl> {
+
+public class CuratorZookeeperClient extends AbstractZookeeperClient<CuratorZookeeperClient.NodeCacheListenerImpl, CuratorZookeeperClient.CuratorWatcherImpl> {
protected static final Logger logger = LoggerFactory.getLogger(CuratorZookeeperClient.class);
private static final String ZK_SESSION_EXPIRE_KEY = "zk.session.expire";
static final Charset CHARSET = StandardCharsets.UTF_8;
private final CuratorFramework client;
- private Map<String, TreeCache> treeCacheMap = new ConcurrentHashMap<>();
+ private static Map<String, NodeCache> nodeCacheMap = new ConcurrentHashMap<>();
public CuratorZookeeperClient(URL url) {
super(url);
@@ -193,7 +193,7 @@ public class CuratorZookeeperClient
protected void deletePath(String path) {
try {
client.delete().deletingChildrenIfNeeded().forPath(path);
- } catch (NoNodeException e) {
+ } catch (NoNodeException ignored) {
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
@@ -277,40 +277,41 @@ public class CuratorZookeeperClient
}
@Override
- protected CuratorZookeeperClient.CuratorWatcherImpl createTargetDataListener(String path, DataListener listener) {
- return new CuratorWatcherImpl(client, listener);
+ protected CuratorZookeeperClient.NodeCacheListenerImpl createTargetDataListener(String path, DataListener listener) {
+ return new NodeCacheListenerImpl(client, listener, path);
}
@Override
- protected void addTargetDataListener(String path, CuratorZookeeperClient.CuratorWatcherImpl treeCacheListener) {
- this.addTargetDataListener(path, treeCacheListener, null);
+ protected void addTargetDataListener(String path, CuratorZookeeperClient.NodeCacheListenerImpl nodeCacheListener) {
+ this.addTargetDataListener(path, nodeCacheListener, null);
}
@Override
- protected void addTargetDataListener(String path, CuratorZookeeperClient.CuratorWatcherImpl treeCacheListener, Executor executor) {
+ protected void addTargetDataListener(String path, CuratorZookeeperClient.NodeCacheListenerImpl nodeCacheListener, Executor executor) {
try {
- TreeCache treeCache = TreeCache.newBuilder(client, path).setCacheData(false).build();
- treeCacheMap.putIfAbsent(path, treeCache);
-
+ NodeCache nodeCache = new NodeCache(client, path);
+ if (nodeCacheMap.putIfAbsent(path, nodeCache) != null) {
+ return;
+ }
if (executor == null) {
- treeCache.getListenable().addListener(treeCacheListener);
+ nodeCache.getListenable().addListener(nodeCacheListener);
} else {
- treeCache.getListenable().addListener(treeCacheListener, executor);
+ nodeCache.getListenable().addListener(nodeCacheListener, executor);
}
- treeCache.start();
+ nodeCache.start();
} catch (Exception e) {
- throw new IllegalStateException("Add treeCache listener for path:" + path, e);
+ throw new IllegalStateException("Add nodeCache listener for path:" + path, e);
}
}
@Override
- protected void removeTargetDataListener(String path, CuratorZookeeperClient.CuratorWatcherImpl treeCacheListener) {
- TreeCache treeCache = treeCacheMap.get(path);
- if (treeCache != null) {
- treeCache.getListenable().removeListener(treeCacheListener);
+ protected void removeTargetDataListener(String path, CuratorZookeeperClient.NodeCacheListenerImpl nodeCacheListener) {
+ NodeCache nodeCache = nodeCacheMap.get(path);
+ if (nodeCache != null) {
+ nodeCache.getListenable().removeListener(nodeCacheListener);
}
- treeCacheListener.dataListener = null;
+ nodeCacheListener.dataListener = null;
}
@Override
@@ -318,21 +319,48 @@ public class CuratorZookeeperClient
listener.unwatch();
}
- static class CuratorWatcherImpl implements CuratorWatcher, TreeCacheListener {
+ static class NodeCacheListenerImpl implements NodeCacheListener {
private CuratorFramework client;
- private volatile ChildListener childListener;
+
private volatile DataListener dataListener;
+
private String path;
- public CuratorWatcherImpl(CuratorFramework client, ChildListener listener, String path) {
+ protected NodeCacheListenerImpl() {
+ }
+
+ public NodeCacheListenerImpl(CuratorFramework client, DataListener dataListener, String path) {
this.client = client;
- this.childListener = listener;
+ this.dataListener = dataListener;
this.path = path;
}
- public CuratorWatcherImpl(CuratorFramework client, DataListener dataListener) {
- this.dataListener = dataListener;
+ @Override
+ public void nodeChanged() throws Exception {
+ ChildData childData = nodeCacheMap.get(path).getCurrentData();
+ String content = null;
+ EventType eventType;
+ if (childData == null) {
+ eventType = EventType.NodeDeleted;
+ } else {
+ content = new String(childData.getData(), CHARSET);
+ eventType = EventType.NodeDataChanged;
+ }
+ dataListener.dataChanged(path, content, eventType);
+ }
+ }
+
+ static class CuratorWatcherImpl implements CuratorWatcher {
+
+ private CuratorFramework client;
+ private volatile ChildListener childListener;
+ private String path;
+
+ public CuratorWatcherImpl(CuratorFramework client, ChildListener listener, String path) {
+ this.client = client;
+ this.childListener = listener;
+ this.path = path;
}
protected CuratorWatcherImpl() {
@@ -354,49 +382,6 @@ public class CuratorZookeeperClient
childListener.childChanged(path, client.getChildren().usingWatcher(this).forPath(path));
}
}
-
- @Override
- public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
- if (dataListener != null) {
- if (logger.isDebugEnabled()) {
- logger.debug("listen the zookeeper changed. The changed data:" + event.getData());
- }
- TreeCacheEvent.Type type = event.getType();
- EventType eventType = null;
- String content = null;
- String path = null;
- switch (type) {
- case NODE_ADDED:
- eventType = EventType.NodeCreated;
- path = event.getData().getPath();
- content = event.getData().getData() == null ? "" : new String(event.getData().getData(), CHARSET);
- break;
- case NODE_UPDATED:
- eventType = EventType.NodeDataChanged;
- path = event.getData().getPath();
- content = event.getData().getData() == null ? "" : new String(event.getData().getData(), CHARSET);
- break;
- case NODE_REMOVED:
- path = event.getData().getPath();
- eventType = EventType.NodeDeleted;
- break;
- case INITIALIZED:
- eventType = EventType.INITIALIZED;
- break;
- case CONNECTION_LOST:
- eventType = EventType.CONNECTION_LOST;
- break;
- case CONNECTION_RECONNECTED:
- eventType = EventType.CONNECTION_RECONNECTED;
- break;
- case CONNECTION_SUSPENDED:
- eventType = EventType.CONNECTION_SUSPENDED;
- break;
-
- }
- dataListener.dataChanged(path, content, eventType);
- }
- }
}
private class CuratorConnectionStateListener implements ConnectionStateListener {
diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClientTest.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClientTest.java
index 2f2b2f5..aa37cbe 100644
--- a/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClientTest.java
+++ b/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClientTest.java
@@ -22,7 +22,6 @@ import org.apache.dubbo.remoting.zookeeper.ChildListener;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingServer;
import org.apache.zookeeper.WatchedEvent;
@@ -172,24 +171,25 @@ public class CuratorZookeeperClientTest {
String valueFromCache = curatorClient.getContent(path + "/d.json");
Assertions.assertEquals(value, valueFromCache);
final AtomicInteger atomicInteger = new AtomicInteger(0);
- curatorClient.addTargetDataListener(listenerPath, new CuratorZookeeperClient.CuratorWatcherImpl() {
+ curatorClient.addTargetDataListener(path + "/d.json", new CuratorZookeeperClient.NodeCacheListenerImpl() {
+
@Override
- public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
- System.out.println("===" + event);
+ public void nodeChanged() throws Exception {
atomicInteger.incrementAndGet();
}
});
valueFromCache = curatorClient.getContent(path + "/d.json");
Assertions.assertNotNull(valueFromCache);
- curatorClient.getClient().setData().forPath(path + "/d.json", "sdsdf".getBytes());
- curatorClient.getClient().setData().forPath(path + "/d.json", "dfsasf".getBytes());
+
+ Thread.sleep(100);
+ curatorClient.getClient().setData().forPath(path + "/d.json", "foo".getBytes());
+ Thread.sleep(100);
+ curatorClient.getClient().setData().forPath(path + "/d.json", "bar".getBytes());
curatorClient.delete(path + "/d.json");
- curatorClient.delete(path);
valueFromCache = curatorClient.getContent(path + "/d.json");
Assertions.assertNull(valueFromCache);
Thread.sleep(2000L);
- Assertions.assertTrue(9L >= atomicInteger.get());
- Assertions.assertTrue(2L <= atomicInteger.get());
+ Assertions.assertTrue(3L <= atomicInteger.get());
}
}