You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by xy...@apache.org on 2023/05/18 03:30:23 UTC

[helix] branch metaclient updated: Add find and remove for recursive persist listener trie (#2460)

This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch metaclient
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/metaclient by this push:
     new e224e0e81 Add find and remove for recursive persist listener trie  (#2460)
e224e0e81 is described below

commit e224e0e81cea7c86cd1a9ddfe2d4da2cc4734dc6
Author: xyuanlu <xy...@gmail.com>
AuthorDate: Wed May 17 20:30:16 2023 -0700

    Add find and remove for recursive persist listener trie  (#2460)
    
    ZkPathRecursiveWatcherTrie will be used as a registry of persistent recursive watchers. When persist recursive watcher is registered on path /X, ZK will send out data change for any data/child changing under the tree structure of /X. The event only include the path of changed ZNode. In ZkClient, when ever we get a dataChange event for path /X/Y/Z, we need to track back the path and notify all registered recursive persist listener and notify them about the node change.
    ref: https://zookeeper.apache.org/doc/r3.7.1/zookeeperProgrammers.html#sc_WatchPersistentRecursive
---
 .../zkclient/util/ZkPathRecursiveWatcherTrie.java  | 82 ++++++++++++++++++++--
 .../util/TestZkPathRecursiveWatcherTrie.java       | 17 +++--
 2 files changed, 89 insertions(+), 10 deletions(-)

diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/util/ZkPathRecursiveWatcherTrie.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/util/ZkPathRecursiveWatcherTrie.java
index 6bdf0cceb..9adcb3ac1 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/util/ZkPathRecursiveWatcherTrie.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/util/ZkPathRecursiveWatcherTrie.java
@@ -31,7 +31,6 @@ import java.util.stream.Stream;
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.helix.zookeeper.zkclient.RecursivePersistListener;
 
-
 /**
  * ZkPathRecursiveWatcherTrie will be used as a registry of persistent recursive watchers.
  * When persist recursive watcher is registered on path /X, ZK will send out data change for any
@@ -139,17 +138,92 @@ public class ZkPathRecursiveWatcherTrie {
     if (path.isEmpty()) {
       throw new IllegalArgumentException("Empty path: " + path);
     }
-    final List<String>  pathComponents = split(path);
+    final List<String> pathComponents = split(path);
 
-    synchronized(this) {
+    synchronized (this) {
       TrieNode parent = _rootNode;
       for (final String part : pathComponents) {
-        parent = parent.getChildren().computeIfAbsent(part, (p)-> new TrieNode(part) );
+        parent = parent.getChildren().computeIfAbsent(part, TrieNode::new);
       }
       parent._recursiveListeners.add(listener);
     }
   }
 
+  public Set<RecursivePersistListener> getAllRecursiveListeners(String path) {
+    Objects.requireNonNull(path, "Path cannot be null");
+
+    final List<String> pathComponents = split(path);
+    Set<RecursivePersistListener> result = new HashSet<>();
+    synchronized (this) {
+      TrieNode cur = _rootNode;
+      for (final String element : pathComponents) {
+        cur = cur.getChild(element);
+        if (cur == null) {
+          break;
+        }
+        result.addAll(cur.getRecursiveListeners());
+      }
+    }
+
+    return result;
+  }
+
+  /**
+   * Removing a RecursivePersistWatcherListener on a path.
+   *
+   * Delete a path from the nearest trie node to current node if this is the only listener and there
+   * is no child on the trie node.
+   *
+   * @param path the of the lister registered
+   * @param listener the RecursivePersistListener to be removed
+   */
+  public void removeRecursiveListener(final String path, RecursivePersistListener listener) {
+    Objects.requireNonNull(path, "Path cannot be null");
+
+    if (path.length() == 0) {
+      throw new IllegalArgumentException("Invalid path: " + path);
+    }
+    final List<String> pathComponents = split(path);
+
+    synchronized (this) {
+      TrieNode cur = _rootNode;
+      TrieNode highestNodeForDelete =
+          null; // track the highest node that from that node to leaf node.
+      TrieNode prevDeletable = _rootNode;
+      for (final String part : pathComponents) {
+        cur = cur.getChild(part);
+        if (cur == null) {
+          return;
+        }
+        // Every time when we move down one level of trie node, 3 pointers may be updated.
+        // highestNodeForDelete, prev of highestNodeForDelete and cur TrieNode.
+        // we invalidate `highestNodeForDelete` when cur node is non leaf node but has more than one
+        // children or listener, or it is leaf node and has more than one listener.
+
+        boolean candidateToDelete =
+            (cur.getChildren().size() == 1 && cur.getRecursiveListeners().isEmpty()) || (
+                cur.getChildren().isEmpty() && cur.getRecursiveListeners().size() == 1 && cur
+                    .getRecursiveListeners().contains(listener));
+        if (candidateToDelete) {
+          if (highestNodeForDelete == null) {
+            highestNodeForDelete = cur;
+          }
+        } else {
+          prevDeletable = cur;
+          highestNodeForDelete = null;
+        }
+      }
+      if (!cur.getRecursiveListeners().contains(listener)) {
+        return;
+      }
+      cur.getRecursiveListeners().remove(listener);
+
+      if (highestNodeForDelete != null) {
+        prevDeletable.getChildren().remove(highestNodeForDelete.getValue());
+      }
+    }
+  }
+
   /**
    * Clear all nodes in the trie.
    */
diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/zkclient/util/TestZkPathRecursiveWatcherTrie.java b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/zkclient/util/TestZkPathRecursiveWatcherTrie.java
index 83d215cf6..ca0198c75 100644
--- a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/zkclient/util/TestZkPathRecursiveWatcherTrie.java
+++ b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/zkclient/util/TestZkPathRecursiveWatcherTrie.java
@@ -49,7 +49,7 @@ public class TestZkPathRecursiveWatcherTrie {
    *
    */
   @org.testng.annotations.Test
-  public void testAddRemoveWatcher() {
+  public void testAddRemoveGetWatcher() {
     System.out.println("START testAddRemoveWatcher at " + new Date(System.currentTimeMillis()));
     _recursiveWatcherTrie.addRecursiveListener("/a/b/c/d", new Test());
     _recursiveWatcherTrie.addRecursiveListener("/a/b/c/d1", new Test());
@@ -70,13 +70,17 @@ public class TestZkPathRecursiveWatcherTrie {
     Assert.assertEquals(
         _recursiveWatcherTrie.getRootNode().getChild("a").getChild("b3").getChild("c").getChild("d")
             .getChild("e").getChild("f").getRecursiveListeners().size(), 2);
+    Assert.assertEquals(_recursiveWatcherTrie.getAllRecursiveListeners("a/b3/c/d/e/f/g/h").size(), 2);
+    Assert.assertEquals(_recursiveWatcherTrie.getAllRecursiveListeners("a/b/c/d/e/f/g/h").size(), 2);
 
-   /* _recursiveWatcherTrie.removeRecursiveListener("/a/b3/c/d/e/f", listenerOnf_1); // step [1]
+    _recursiveWatcherTrie.removeRecursiveListener("/a/b3/c/d/e/f", listenerOnf_1); // step [1]
     _recursiveWatcherTrie.removeRecursiveListener("/a/b2", listenerOnb2);          //  step[2]
     //b2 will be removed. node "a" should have 2 children, b and b3.
     Assert.assertEquals(_recursiveWatcherTrie.getRootNode().getChild("a").getChildren().size(), 2);
-    Assert.assertTrue(_recursiveWatcherTrie.getRootNode().getChild("a").getChildren().contains("b3"));
-    Assert.assertTrue(_recursiveWatcherTrie.getRootNode().getChild("a").getChildren().contains("b"));
+    Assert.assertTrue(
+        _recursiveWatcherTrie.getRootNode().getChild("a").getChildren().containsKey("b3"));
+    Assert.assertTrue(
+        _recursiveWatcherTrie.getRootNode().getChild("a").getChildren().containsKey("b"));
     // path "/a/b3/c/d/e/f still exists with end node "f" has one listener
     Assert.assertEquals(
         _recursiveWatcherTrie.getRootNode().getChild("a").getChild("b3").getChild("c").getChild("d")
@@ -87,10 +91,11 @@ public class TestZkPathRecursiveWatcherTrie {
 
     // removing all listeners of /a/b3/c/d/e/f.
     _recursiveWatcherTrie.removeRecursiveListener("/a/b3/c/d/e/f", listenerOnf_1); // test no op
-    _recursiveWatcherTrie.removeRecursiveListener("/a/b3/c/d/e/f", listenerOnf_2);
+    _recursiveWatcherTrie.removeRecursiveListener("/a/b3/c/d/e/f", listenerOnf_2); // step [3]
     // b3 should be removed as well as all children nodes of b3
     Assert.assertEquals(_recursiveWatcherTrie.getRootNode().getChild("a").getChildren().size(), 1);
- */
+    // node f should have 0 listeners
+    Assert.assertEquals(_recursiveWatcherTrie.getAllRecursiveListeners("a/b3/c/d/e/f/g/h").size(), 0);
   }
 
   class Test implements RecursivePersistListener {