You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by xy...@apache.org on 2023/04/26 04:03:49 UTC

[helix] branch metaclient updated: Add a Trie class to represent RecursivePersistWatcherListener in ZkClient (#2439)

This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch metaclient
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/metaclient by this push:
     new 6d42ee64e Add a Trie class to represent RecursivePersistWatcherListener in ZkClient (#2439)
6d42ee64e is described below

commit 6d42ee64e422232dc93f96c31a349cb200ea7c62
Author: xyuanlu <xy...@gmail.com>
AuthorDate: Tue Apr 25 21:03:42 2023 -0700

    Add a Trie class to represent RecursivePersistWatcherListener in ZkClient (#2439)
    
    ZkPathRecursiveWatcherTrie will be used for a registry for ZK persist recursive watcher.
    When persist recursive watcher is registered on path /X, ZK will send out data change for any data/child changing under the tree structure of /X. The event only include the path of changed ZNode.
    
    In ZkClient, when ever we get a dataChange event for path /X/Y/Z, we need to track back the path and notify all registered recursive persist listener and notify them about the node change.
    ref: https://zookeeper.apache.org/doc/r3.7.1/zookeeperProgrammers.html#sc_WatchPersistentRecursive
---
 .../zkclient/RecursivePersistListener.java         |  41 +++++
 .../zkclient/util/ZkPathRecursiveWatcherTrie.java  | 169 +++++++++++++++++++++
 .../util/TestZkPathRecursiveWatcherTrie.java       | 104 +++++++++++++
 3 files changed, 314 insertions(+)

diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/RecursivePersistListener.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/RecursivePersistListener.java
new file mode 100644
index 000000000..96c09b340
--- /dev/null
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/RecursivePersistListener.java
@@ -0,0 +1,41 @@
+package org.apache.helix.zookeeper.zkclient;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.zookeeper.Watcher;
+
+/**
+ * An {@link RecursivePersistListener} can be registered at a {@link ZkClient} for listening on all
+ * zk children changes in a tree structure for a given path.
+ *
+ * The listener is a persist listener. No need to resubscribe.
+ *
+ */
+public interface RecursivePersistListener {
+  /**
+   * invoked when there is a node added, removed or node data change in the tree structure of
+   * that RecursivePersistListener subscribed path
+   * @param dataPath The path of ZNode that change happened
+   * @param eventType Event type, including NodeCreated, NodeDataChanged and NodeDeleted
+   * @throws Exception
+   */
+  public void handleZNodeChange(String dataPath, Watcher.Event.EventType eventType)
+      throws Exception;
+}
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/util/ZkPathRecursiveWatcherTrie.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/util/ZkPathRecursiveWatcherTrie.java
new file mode 100644
index 000000000..6bdf0cceb
--- /dev/null
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/util/ZkPathRecursiveWatcherTrie.java
@@ -0,0 +1,169 @@
+package org.apache.helix.zookeeper.zkclient.util;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.helix.zookeeper.zkclient.RecursivePersistListener;
+
+
+/**
+ * ZkPathRecursiveWatcherTrie will be used as a registry of persistent recursive watchers.
+ * When persist recursive watcher is registered on path /X, ZK will send out data change for any
+ * data/child changing under the tree structure of /X. The event only include the path of changed
+ * ZNode.
+ * In ZkClient, when ever we get a dataChange event for path /X/Y/Z, we need to track back the path
+ * and notify all registered recursive persist listener and notify them about the node change.
+ * ref: https://zookeeper.apache.org/doc/r3.7.1/zookeeperProgrammers.html#sc_WatchPersistentRecursive
+ */
+public class ZkPathRecursiveWatcherTrie {
+
+  /** Root node of PathTrie */
+  private final TrieNode _rootNode;
+
+  static class TrieNode {
+
+    final String _value;   // Segmented ZNode path at current level
+    // A map of segmented ZNode path of next level to TrieNode, We keep the same initial
+    // children size as Zk server
+    final Map<String, TrieNode> _children = new HashMap<>(4);
+    // A list of recursive persist watcher on the current path
+    Set<RecursivePersistListener> _recursiveListeners = new HashSet<>(4);
+
+    /**
+     * Create a trie node with parent as parameter.
+     *
+     * @param value the value stored in this node
+     */
+    private TrieNode(String value) {
+      _value = value;
+    }
+
+    /**
+     * The value stored in this node.
+     *
+     * @return the value stored in this node
+     */
+    public String getValue() {
+      return this._value;
+    }
+
+    /**
+     * Add a child to the existing node.
+     *
+     * @param childName the string name of the child
+     * @param node the node that is the child
+     */
+    void addChild(String childName, TrieNode node) {
+      this._children.putIfAbsent(childName, node);
+    }
+
+    /**
+     * Return the child of a node mapping to the input child name.
+     *
+     * @param childName the name of the child
+     * @return the child of a node
+     */
+    @VisibleForTesting
+    TrieNode getChild(String childName) {
+      return this._children.get(childName);
+    }
+
+    /**
+     * Get the list of children of this trienode.
+     *
+     * @return A collection containing the node's children
+     */
+    @VisibleForTesting
+    Map<String, TrieNode> getChildren() {
+      return _children;
+    }
+
+    /**
+     * Get the set of RecursivePersistWatcherListener
+     * Returns an empty set if no listener is registered on the path
+     * @return
+     */
+    @VisibleForTesting
+    Set<RecursivePersistListener> getRecursiveListeners() {
+      return _recursiveListeners;
+    }
+
+    @Override
+    public String toString() {
+      return "TrieNode [name=" + _value + ", children=" + _children.keySet() + "]";
+    }
+  }
+
+  /**
+   * Construct a new PathTrie with a root node.
+   */
+  public ZkPathRecursiveWatcherTrie() {
+    this._rootNode = new TrieNode( "/");
+  }
+
+  /**
+   * Add a path to the path trie. All paths are relative to the root node.
+   *
+   * @param path the path to add RecursivePersistListener
+   * @param listener the RecursivePersistListener to be added
+   */
+  public void addRecursiveListener(final String path, RecursivePersistListener listener) {
+    Objects.requireNonNull(path, "Path cannot be null");
+
+    if (path.isEmpty()) {
+      throw new IllegalArgumentException("Empty path: " + path);
+    }
+    final List<String>  pathComponents = split(path);
+
+    synchronized(this) {
+      TrieNode parent = _rootNode;
+      for (final String part : pathComponents) {
+        parent = parent.getChildren().computeIfAbsent(part, (p)-> new TrieNode(part) );
+      }
+      parent._recursiveListeners.add(listener);
+    }
+  }
+
+  /**
+   * Clear all nodes in the trie.
+   */
+  public synchronized void clear() {
+    _rootNode.getChildren().clear();
+  }
+
+  private static List<String> split(final String path) {
+    return Stream.of(path.split("/")).filter(t -> !t.trim().isEmpty()).collect(Collectors.toList());
+  }
+
+  // only for test
+  @VisibleForTesting
+  TrieNode getRootNode() {
+    return _rootNode;
+  }
+}
diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/zkclient/util/TestZkPathRecursiveWatcherTrie.java b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/zkclient/util/TestZkPathRecursiveWatcherTrie.java
new file mode 100644
index 000000000..83d215cf6
--- /dev/null
+++ b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/zkclient/util/TestZkPathRecursiveWatcherTrie.java
@@ -0,0 +1,104 @@
+package org.apache.helix.zookeeper.zkclient.util;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+
+import org.apache.helix.zookeeper.zkclient.RecursivePersistListener;
+import org.apache.zookeeper.Watcher;
+import org.testng.Assert;
+
+
+public class TestZkPathRecursiveWatcherTrie {
+  ZkPathRecursiveWatcherTrie _recursiveWatcherTrie = new ZkPathRecursiveWatcherTrie();
+
+  /**
+   * test case create a tire tree of following structure. '*' means how many listener is added
+   * on that path.
+   * [x] mean the listener is removed as step 'x' indicated in the test comment.
+   *
+   *                         "/"
+   *                         a
+   *                    /   |        \
+   *                  b*    b2* [2]    b3
+   *                 /                 \
+   *                c                   c
+   *          / /   |  \  \              \
+   *        d* d1* d2* d3* d4*           d
+   *                                      \
+   *                                      e
+   *                                      \
+   *                                      f**   [1] [3]
+   *
+   */
+  @org.testng.annotations.Test
+  public void testAddRemoveWatcher() {
+    System.out.println("START testAddRemoveWatcher at " + new Date(System.currentTimeMillis()));
+    _recursiveWatcherTrie.addRecursiveListener("/a/b/c/d", new Test());
+    _recursiveWatcherTrie.addRecursiveListener("/a/b/c/d1", new Test());
+    _recursiveWatcherTrie.addRecursiveListener("/a/b/c/d2", new Test());
+    _recursiveWatcherTrie.addRecursiveListener("/a/b/c/d3", new Test());
+    _recursiveWatcherTrie.addRecursiveListener("/a/b/c/d4", new Test());
+
+    Test listenerOnb = new Test();
+    _recursiveWatcherTrie.addRecursiveListener("/a/b", listenerOnb);
+    Test listenerOnb2 = new Test();
+    _recursiveWatcherTrie.addRecursiveListener("/a/b2", listenerOnb2);
+    Test listenerOnf_1 = new Test();
+    _recursiveWatcherTrie.addRecursiveListener("/a/b3/c/d/e/f", listenerOnf_1);
+    Test listenerOnf_2 = new Test();
+    _recursiveWatcherTrie.addRecursiveListener("/a/b3/c/d/e/f", listenerOnf_2);
+
+    // node f should have 2 listeners
+    Assert.assertEquals(
+        _recursiveWatcherTrie.getRootNode().getChild("a").getChild("b3").getChild("c").getChild("d")
+            .getChild("e").getChild("f").getRecursiveListeners().size(), 2);
+
+   /* _recursiveWatcherTrie.removeRecursiveListener("/a/b3/c/d/e/f", listenerOnf_1); // step [1]
+    _recursiveWatcherTrie.removeRecursiveListener("/a/b2", listenerOnb2);          //  step[2]
+    //b2 will be removed. node "a" should have 2 children, b and b3.
+    Assert.assertEquals(_recursiveWatcherTrie.getRootNode().getChild("a").getChildren().size(), 2);
+    Assert.assertTrue(_recursiveWatcherTrie.getRootNode().getChild("a").getChildren().contains("b3"));
+    Assert.assertTrue(_recursiveWatcherTrie.getRootNode().getChild("a").getChildren().contains("b"));
+    // path "/a/b3/c/d/e/f still exists with end node "f" has one listener
+    Assert.assertEquals(
+        _recursiveWatcherTrie.getRootNode().getChild("a").getChild("b3").getChild("c").getChild("d")
+            .getChild("e").getChildren().size(), 1);
+    Assert.assertEquals(
+        _recursiveWatcherTrie.getRootNode().getChild("a").getChild("b3").getChild("c").getChild("d")
+            .getChild("e").getChild("f").getRecursiveListeners().size(), 1);
+
+    // removing all listeners of /a/b3/c/d/e/f.
+    _recursiveWatcherTrie.removeRecursiveListener("/a/b3/c/d/e/f", listenerOnf_1); // test no op
+    _recursiveWatcherTrie.removeRecursiveListener("/a/b3/c/d/e/f", listenerOnf_2);
+    // b3 should be removed as well as all children nodes of b3
+    Assert.assertEquals(_recursiveWatcherTrie.getRootNode().getChild("a").getChildren().size(), 1);
+ */
+  }
+
+  class Test implements RecursivePersistListener {
+
+    @Override
+    public void handleZNodeChange(String dataPath, Watcher.Event.EventType eventType)
+        throws Exception {
+
+    }
+  }
+}