You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by xy...@apache.org on 2023/05/18 03:30:23 UTC
[helix] branch metaclient updated: Add find and remove for recursive persist listener trie (#2460)
This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch metaclient
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/metaclient by this push:
new e224e0e81 Add find and remove for recursive persist listener trie (#2460)
e224e0e81 is described below
commit e224e0e81cea7c86cd1a9ddfe2d4da2cc4734dc6
Author: xyuanlu <xy...@gmail.com>
AuthorDate: Wed May 17 20:30:16 2023 -0700
Add find and remove for recursive persist listener trie (#2460)
ZkPathRecursiveWatcherTrie will be used as a registry of persistent recursive watchers. When persist recursive watcher is registered on path /X, ZK will send out data change for any data/child changing under the tree structure of /X. The event only include the path of changed ZNode. In ZkClient, when ever we get a dataChange event for path /X/Y/Z, we need to track back the path and notify all registered recursive persist listener and notify them about the node change.
ref: https://zookeeper.apache.org/doc/r3.7.1/zookeeperProgrammers.html#sc_WatchPersistentRecursive
---
.../zkclient/util/ZkPathRecursiveWatcherTrie.java | 82 ++++++++++++++++++++--
.../util/TestZkPathRecursiveWatcherTrie.java | 17 +++--
2 files changed, 89 insertions(+), 10 deletions(-)
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/util/ZkPathRecursiveWatcherTrie.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/util/ZkPathRecursiveWatcherTrie.java
index 6bdf0cceb..9adcb3ac1 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/util/ZkPathRecursiveWatcherTrie.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/util/ZkPathRecursiveWatcherTrie.java
@@ -31,7 +31,6 @@ import java.util.stream.Stream;
import com.google.common.annotations.VisibleForTesting;
import org.apache.helix.zookeeper.zkclient.RecursivePersistListener;
-
/**
* ZkPathRecursiveWatcherTrie will be used as a registry of persistent recursive watchers.
* When persist recursive watcher is registered on path /X, ZK will send out data change for any
@@ -139,17 +138,92 @@ public class ZkPathRecursiveWatcherTrie {
if (path.isEmpty()) {
throw new IllegalArgumentException("Empty path: " + path);
}
- final List<String> pathComponents = split(path);
+ final List<String> pathComponents = split(path);
- synchronized(this) {
+ synchronized (this) {
TrieNode parent = _rootNode;
for (final String part : pathComponents) {
- parent = parent.getChildren().computeIfAbsent(part, (p)-> new TrieNode(part) );
+ parent = parent.getChildren().computeIfAbsent(part, TrieNode::new);
}
parent._recursiveListeners.add(listener);
}
}
+ public Set<RecursivePersistListener> getAllRecursiveListeners(String path) {
+ Objects.requireNonNull(path, "Path cannot be null");
+
+ final List<String> pathComponents = split(path);
+ Set<RecursivePersistListener> result = new HashSet<>();
+ synchronized (this) {
+ TrieNode cur = _rootNode;
+ for (final String element : pathComponents) {
+ cur = cur.getChild(element);
+ if (cur == null) {
+ break;
+ }
+ result.addAll(cur.getRecursiveListeners());
+ }
+ }
+
+ return result;
+ }
+
+ /**
+ * Removing a RecursivePersistWatcherListener on a path.
+ *
+ * Delete a path from the nearest trie node to current node if this is the only listener and there
+ * is no child on the trie node.
+ *
+ * @param path the of the lister registered
+ * @param listener the RecursivePersistListener to be removed
+ */
+ public void removeRecursiveListener(final String path, RecursivePersistListener listener) {
+ Objects.requireNonNull(path, "Path cannot be null");
+
+ if (path.length() == 0) {
+ throw new IllegalArgumentException("Invalid path: " + path);
+ }
+ final List<String> pathComponents = split(path);
+
+ synchronized (this) {
+ TrieNode cur = _rootNode;
+ TrieNode highestNodeForDelete =
+ null; // track the highest node that from that node to leaf node.
+ TrieNode prevDeletable = _rootNode;
+ for (final String part : pathComponents) {
+ cur = cur.getChild(part);
+ if (cur == null) {
+ return;
+ }
+ // Every time when we move down one level of trie node, 3 pointers may be updated.
+ // highestNodeForDelete, prev of highestNodeForDelete and cur TrieNode.
+ // we invalidate `highestNodeForDelete` when cur node is non leaf node but has more than one
+ // children or listener, or it is leaf node and has more than one listener.
+
+ boolean candidateToDelete =
+ (cur.getChildren().size() == 1 && cur.getRecursiveListeners().isEmpty()) || (
+ cur.getChildren().isEmpty() && cur.getRecursiveListeners().size() == 1 && cur
+ .getRecursiveListeners().contains(listener));
+ if (candidateToDelete) {
+ if (highestNodeForDelete == null) {
+ highestNodeForDelete = cur;
+ }
+ } else {
+ prevDeletable = cur;
+ highestNodeForDelete = null;
+ }
+ }
+ if (!cur.getRecursiveListeners().contains(listener)) {
+ return;
+ }
+ cur.getRecursiveListeners().remove(listener);
+
+ if (highestNodeForDelete != null) {
+ prevDeletable.getChildren().remove(highestNodeForDelete.getValue());
+ }
+ }
+ }
+
/**
* Clear all nodes in the trie.
*/
diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/zkclient/util/TestZkPathRecursiveWatcherTrie.java b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/zkclient/util/TestZkPathRecursiveWatcherTrie.java
index 83d215cf6..ca0198c75 100644
--- a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/zkclient/util/TestZkPathRecursiveWatcherTrie.java
+++ b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/zkclient/util/TestZkPathRecursiveWatcherTrie.java
@@ -49,7 +49,7 @@ public class TestZkPathRecursiveWatcherTrie {
*
*/
@org.testng.annotations.Test
- public void testAddRemoveWatcher() {
+ public void testAddRemoveGetWatcher() {
System.out.println("START testAddRemoveWatcher at " + new Date(System.currentTimeMillis()));
_recursiveWatcherTrie.addRecursiveListener("/a/b/c/d", new Test());
_recursiveWatcherTrie.addRecursiveListener("/a/b/c/d1", new Test());
@@ -70,13 +70,17 @@ public class TestZkPathRecursiveWatcherTrie {
Assert.assertEquals(
_recursiveWatcherTrie.getRootNode().getChild("a").getChild("b3").getChild("c").getChild("d")
.getChild("e").getChild("f").getRecursiveListeners().size(), 2);
+ Assert.assertEquals(_recursiveWatcherTrie.getAllRecursiveListeners("a/b3/c/d/e/f/g/h").size(), 2);
+ Assert.assertEquals(_recursiveWatcherTrie.getAllRecursiveListeners("a/b/c/d/e/f/g/h").size(), 2);
- /* _recursiveWatcherTrie.removeRecursiveListener("/a/b3/c/d/e/f", listenerOnf_1); // step [1]
+ _recursiveWatcherTrie.removeRecursiveListener("/a/b3/c/d/e/f", listenerOnf_1); // step [1]
_recursiveWatcherTrie.removeRecursiveListener("/a/b2", listenerOnb2); // step[2]
//b2 will be removed. node "a" should have 2 children, b and b3.
Assert.assertEquals(_recursiveWatcherTrie.getRootNode().getChild("a").getChildren().size(), 2);
- Assert.assertTrue(_recursiveWatcherTrie.getRootNode().getChild("a").getChildren().contains("b3"));
- Assert.assertTrue(_recursiveWatcherTrie.getRootNode().getChild("a").getChildren().contains("b"));
+ Assert.assertTrue(
+ _recursiveWatcherTrie.getRootNode().getChild("a").getChildren().containsKey("b3"));
+ Assert.assertTrue(
+ _recursiveWatcherTrie.getRootNode().getChild("a").getChildren().containsKey("b"));
// path "/a/b3/c/d/e/f still exists with end node "f" has one listener
Assert.assertEquals(
_recursiveWatcherTrie.getRootNode().getChild("a").getChild("b3").getChild("c").getChild("d")
@@ -87,10 +91,11 @@ public class TestZkPathRecursiveWatcherTrie {
// removing all listeners of /a/b3/c/d/e/f.
_recursiveWatcherTrie.removeRecursiveListener("/a/b3/c/d/e/f", listenerOnf_1); // test no op
- _recursiveWatcherTrie.removeRecursiveListener("/a/b3/c/d/e/f", listenerOnf_2);
+ _recursiveWatcherTrie.removeRecursiveListener("/a/b3/c/d/e/f", listenerOnf_2); // step [3]
// b3 should be removed as well as all children nodes of b3
Assert.assertEquals(_recursiveWatcherTrie.getRootNode().getChild("a").getChildren().size(), 1);
- */
+ // node f should have 0 listeners
+ Assert.assertEquals(_recursiveWatcherTrie.getAllRecursiveListeners("a/b3/c/d/e/f/g/h").size(), 0);
}
class Test implements RecursivePersistListener {