You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@helix.apache.org by "xyuanlu (via GitHub)" <gi...@apache.org> on 2023/05/24 16:50:43 UTC

[GitHub] [helix] xyuanlu opened a new pull request, #2506: ZkClient add recursive persist listener implementation

xyuanlu opened a new pull request, #2506:
URL: https://github.com/apache/helix/pull/2506

   ### Issues
   
   - [X] My PR addresses the following Helix issues and references them in the PR description:
   #2237 
   
   ### Description
   
   - [ ] Here are some details about my PR, including screenshots of any UI changes:
   
   (Write a concise description including what, why, how)
   
   ### Tests
   
   - [ ] The following tests are written for this issue:
   
   (List the names of added unit/integration tests)
   
   - The following is the result of the "mvn test" command on the appropriate module:
   
   (If CI test fails due to known issue, please specify the issue and test PR locally. Then copy & paste the result of "mvn test" to here.)
   
   ### Changes that Break Backward Compatibility (Optional)
   
   - My PR contains changes that break backward compatibility or previous assumptions for certain methods or API. They include:
   
   (Consider including all behavior changes for public methods or API. Also include these changes in merge description so that other developers are aware of these changes. This allows them to make relevant code changes in feature branches accounting for the new method/API behavior.)
   
   ### Documentation (Optional)
   
   - In case of new functionality, my PR adds documentation in the following wiki page:
   
   (Link the GitHub wiki you added)
   
   ### Commits
   
   - My commits all reference appropriate Apache Helix GitHub issues in their subject lines. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)":
     1. Subject is separated from body by a blank line
     1. Subject is limited to 50 characters (not including Jira issue reference)
     1. Subject does not end with a period
     1. Subject uses the imperative mood ("add", not "adding")
     1. Body wraps at 72 characters
     1. Body explains "what" and "why", not "how"
   
   ### Code Quality
   
   - My diff has been formatted using helix-style.xml 
   (helix-style-intellij.xml if IntelliJ IDE is used)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] desaikomal commented on a diff in pull request #2506: ZkClient add recursive persist listener implementation

Posted by "desaikomal (via GitHub)" <gi...@apache.org>.
desaikomal commented on code in PR #2506:
URL: https://github.com/apache/helix/pull/2506#discussion_r1212071692


##########
meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java:
##########
@@ -332,15 +333,13 @@ public boolean subscribeStateChanges(ConnectStateChangeListener listener) {
     return true;
   }
 
-  // TODO: add impl and remove UnimplementedException
   @Override
   public boolean subscribeChildChanges(String key, ChildChangeListener listener, boolean skipWatchingNonExistNode) {
-    try {
-      _zkClient.subscribePersistRecursiveWatcher(key, new ChildListenerAdapter(listener));
-    } catch (KeeperException.UnimplementedException e) {
-      LOG.error(e.getLocalizedMessage());
+    if (skipWatchingNonExistNode && exists(key) == null) {
+      return false;
     }
-    return false;
+    _zkClient.subscribePersistRecursiveListener(key, new ChildListenerAdapter(listener));

Review Comment:
   Is there any exception which subscribePersistRecursiveListener can throw?  just wondering 



##########
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java:
##########
@@ -343,17 +343,54 @@ public void subscribeDataChanges(String path, IZkDataListener listener) {
    * Subscribe RecursivePersistListener for a particular path. User can only subscribe when
    * `_usePersistWatcher` is set to true and there is no pre-existing watcher on the path.
    */
-  // TODO: Add impl and remove exception
-  public boolean subscribePersistRecursiveWatcher(String path,
-      RecursivePersistListener recursivePersistListener)
-      throws KeeperException.UnimplementedException {
-    throw new KeeperException.UnimplementedException();
+  public void subscribePersistRecursiveListener(String path,
+      RecursivePersistListener recursivePersistListener) {
+    if (!_usePersistWatcher) {

Review Comment:
   this does throw exception, in that case, should the higher level call return 'false' or we need exception to pass through?



##########
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java:
##########
@@ -343,17 +343,54 @@ public void subscribeDataChanges(String path, IZkDataListener listener) {
    * Subscribe RecursivePersistListener for a particular path. User can only subscribe when
    * `_usePersistWatcher` is set to true and there is no pre-existing watcher on the path.
    */
-  // TODO: Add impl and remove exception
-  public boolean subscribePersistRecursiveWatcher(String path,
-      RecursivePersistListener recursivePersistListener)
-      throws KeeperException.UnimplementedException {
-    throw new KeeperException.UnimplementedException();
+  public void subscribePersistRecursiveListener(String path,
+      RecursivePersistListener recursivePersistListener) {
+    if (!_usePersistWatcher) {
+      throw new UnsupportedOperationException(
+          "Can not subscribe PersistRecursiveWatcher. Persist listener is not enabled.");
+    }
+
+    ManipulateListener addListener = () -> {
+      if (hasChildOrDataListeners(path)) {
+        throw new UnsupportedOperationException(
+            "Can not subscribe PersistRecursiveWatcher. There is an existing listener on " + path);
+      }
+      // subscribe a PERSISTENT_RECURSIVE listener on path. It throws exception if not successful
+      retryUntilConnected(() -> {
+        getConnection().addWatch(path, ZkClient.this, AddWatchMode.PERSISTENT_RECURSIVE);
+        return null;

Review Comment:
   nit: this is for my understanding, why are you returning "null" here?



##########
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java:
##########
@@ -1807,6 +1838,23 @@ private void processDataOrChildChange(WatchedEvent event, long notificationTime)
         fireDataChangedEvents(event.getPath(), listeners, OptionalLong.of(notificationTime),
             pathExists, event.getType());
       }
+
+      // fire change event for persist recursive listener
+      if (_usePersistWatcher) {
+        Set<RecursivePersistListener> recListeners =
+            _zkPathRecursiveWatcherTrie.getAllRecursiveListeners(path);
+        if (recListeners != null && !recListeners.isEmpty()) {

Review Comment:
   just my 2cents: not sure if defensive coding is good. it adds to CPU processing something which will never be null.



##########
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/util/ZkPathRecursiveWatcherTrie.java:
##########
@@ -224,6 +223,28 @@ public void removeRecursiveListener(final String path, RecursivePersistListener
     }
   }
 
+  /**
+   * Return if there is listener on a particular path
+   * @param path
+   * @return
+   */
+  public boolean hasListenerOnPath(String path) {
+    Objects.requireNonNull(path, "Path cannot be null");
+
+    final List<String> pathComponents = split(path);
+    TrieNode cur;
+    synchronized (this) {
+      cur = _rootNode;
+      for (final String element : pathComponents) {
+        cur = cur.getChild(element);
+        if (cur == null) {
+          break;
+        }
+      }
+    }
+    return cur!=null && !cur.getRecursiveListeners().isEmpty();

Review Comment:
   nit: cur != null (space before and after !=)
   



##########
zookeeper-api/src/test/java/org/apache/helix/zookeeper/zkclient/TestZkClientPersistWatcher.java:
##########
@@ -116,38 +116,59 @@ void testZkClientPersistRecursiveChange() throws Exception {
     org.apache.helix.zookeeper.impl.client.ZkClient.Builder builder =
         new org.apache.helix.zookeeper.impl.client.ZkClient.Builder();
     builder.setZkServer(ZkTestBase.ZK_ADDR).setMonitorRootPathOnly(false)
-        .setUsePersistWatcher(false);
+        .setUsePersistWatcher(true);
     org.apache.helix.zookeeper.impl.client.ZkClient zkClient = builder.build();
     zkClient.setZkSerializer(new BasicZkSerializer(new SerializableSerializer()));
     int count = 100;
     final AtomicInteger[] event_count = {new AtomicInteger(0)};
     final AtomicInteger[] event_count2 = {new AtomicInteger(0)};
-    CountDownLatch countDownLatch1 = new CountDownLatch(count);
-    CountDownLatch countDownLatch2 = new CountDownLatch(count/2);
-    String path = "/base/testZkClientChildChange";
+    CountDownLatch countDownLatch1 = new CountDownLatch(count*4);

Review Comment:
   can you add the comments, the magic *4 is for what reason?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] xyuanlu commented on a diff in pull request #2506: ZkClient add recursive persist listener implementation

Posted by "xyuanlu (via GitHub)" <gi...@apache.org>.
xyuanlu commented on code in PR #2506:
URL: https://github.com/apache/helix/pull/2506#discussion_r1204986803


##########
meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java:
##########
@@ -335,12 +335,11 @@ public boolean subscribeStateChanges(ConnectStateChangeListener listener) {
   // TODO: add impl and remove UnimplementedException

Review Comment:
   TFTR! Thanks for the review. I will update.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] rahulrane50 commented on a diff in pull request #2506: ZkClient add recursive persist listener implementation

Posted by "rahulrane50 (via GitHub)" <gi...@apache.org>.
rahulrane50 commented on code in PR #2506:
URL: https://github.com/apache/helix/pull/2506#discussion_r1204914109


##########
meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java:
##########
@@ -335,12 +335,11 @@ public boolean subscribeStateChanges(ConnectStateChangeListener listener) {
   // TODO: add impl and remove UnimplementedException

Review Comment:
   nit. i think we can remove this TODO now.



##########
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java:
##########
@@ -343,17 +343,48 @@ public void subscribeDataChanges(String path, IZkDataListener listener) {
    * Subscribe RecursivePersistListener for a particular path. User can only subscribe when
    * `_usePersistWatcher` is set to true and there is no pre-existing watcher on the path.
    */
-  // TODO: Add impl and remove exception
-  public boolean subscribePersistRecursiveWatcher(String path,
-      RecursivePersistListener recursivePersistListener)
-      throws KeeperException.UnimplementedException {
-    throw new KeeperException.UnimplementedException();
+  public void subscribePersistRecursiveListener(String path,
+      RecursivePersistListener recursivePersistListener) {
+    if (!_usePersistWatcher) {
+      throw new UnsupportedOperationException(
+          "Can not subscribe PersistRecursiveWatcher. Persist listener is not enabled.");
+    }
+
+    ManipulateListener addListener = () -> {
+      if (hasChildOrDataListeners(path)) {
+        throw new UnsupportedOperationException(
+            "Can not subscribe PersistRecursiveWatcher. There is an existing listener on " + path);
+      }
+      _zkPathRecursiveWatcherTrie.addRecursiveListener(path, recursivePersistListener);
+      getConnection().addWatch(path, ZkClient.this, AddWatchMode.PERSISTENT_RECURSIVE);

Review Comment:
   Curious, what happens if we were not able to add watch on zk connection but update trie registry with listener. I'm guessing we do infinitely retry for addWatch operation but curious since "subscribePersistRecursiveListener" not an atomic operation now. 



##########
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java:
##########
@@ -1807,6 +1838,23 @@ private void processDataOrChildChange(WatchedEvent event, long notificationTime)
         fireDataChangedEvents(event.getPath(), listeners, OptionalLong.of(notificationTime),
             pathExists, event.getType());
       }
+
+      // fire change event for persist recursive listener
+      if (_usePersistWatcher) {
+        Set<RecursivePersistListener> recListeners =
+            _zkPathRecursiveWatcherTrie.getAllRecursiveListeners(path);
+        if (recListeners != null && !recListeners.isEmpty()) {
+          for (final RecursivePersistListener listener : recListeners) {

Review Comment:
   Can we start all zk event threads here in parallel?



##########
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/util/ZkPathRecursiveWatcherTrie.java:
##########
@@ -224,6 +223,30 @@ public void removeRecursiveListener(final String path, RecursivePersistListener
     }
   }
 
+  /**
+   *
+   * @param path
+   * @return
+   */
+  public Boolean hasListenerOnPath(String path) {
+    Objects.requireNonNull(path, "Path cannot be null");
+
+    final List<String> pathComponents = split(path);
+    Set<RecursivePersistListener> result = new HashSet<>();

Review Comment:
   what is this result used for?



##########
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java:
##########
@@ -3030,8 +3078,16 @@ interface ManipulateListener {
     void run() throws KeeperException, InterruptedException;
   }
 
+  // Add a persist listener on the path.
+  // Throws UnsupportedOperationException if there is already a recursive persist listener on the
+  // path because it will overwrite that recursive persist listener.
   private void addPersistListener(String path, Object listener) {
     ManipulateListener addListeners = () -> {
+      if (_zkPathRecursiveWatcherTrie.hasListenerOnPath(path)) {

Review Comment:
   Curious, we already check that in subscribing listeners right?



##########
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java:
##########
@@ -1807,6 +1838,23 @@ private void processDataOrChildChange(WatchedEvent event, long notificationTime)
         fireDataChangedEvents(event.getPath(), listeners, OptionalLong.of(notificationTime),
             pathExists, event.getType());
       }
+
+      // fire change event for persist recursive listener
+      if (_usePersistWatcher) {
+        Set<RecursivePersistListener> recListeners =
+            _zkPathRecursiveWatcherTrie.getAllRecursiveListeners(path);
+        if (recListeners != null && !recListeners.isEmpty()) {

Review Comment:
   can recListeners even be null? I see that we always initialize it in getter method. 



##########
meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/adapter/ChildListenerAdapter.java:
##########
@@ -39,7 +39,7 @@ public ChildListenerAdapter(ChildChangeListener listener) {
   private static ChildChangeListener.ChangeType convertType(Watcher.Event.EventType eventType) {
     switch (eventType) {
       case NodeCreated: return ChildChangeListener.ChangeType.ENTRY_CREATED;
-      case NodeChildrenChanged: return ChildChangeListener.ChangeType.ENTRY_DATA_CHANGE;

Review Comment:
   Is my understanding correct that persistent and recursive watcher here and event "NodeChildrenChanged" in helix manager (callback handler events) are totally different? What I'm trying to understand is after persistent recursive watcher change we would get "NodeDataChanged" event instead of "NodeChildrenChanged" event but i can see in helix manager we still fire events of type "NodeChildrenChanged".



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] xyuanlu commented on a diff in pull request #2506: ZkClient add recursive persist listener implementation

Posted by "xyuanlu (via GitHub)" <gi...@apache.org>.
xyuanlu commented on code in PR #2506:
URL: https://github.com/apache/helix/pull/2506#discussion_r1207538696


##########
meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java:
##########
@@ -397,7 +396,77 @@ public void handleDataChange(String key, Object data, ChangeType changeType)
       zkMetaClient.unsubscribeDataChange(basePath, listener);
       // verify that no listener is registered on any path
       watchers = TestUtil.getZkWatch(zkMetaClient.getZkClient());
+      Assert.assertEquals(watchers.get("persistentWatches").size(), 0);
+      Assert.assertEquals(watchers.get("childWatches").size(), 0);
+      Assert.assertEquals(watchers.get("dataWatches").size(), 0);
+
+    }
+  }
+
+  @Test
+  public void testChildChangeListener() throws Exception {
+    final String basePath = "/TestZkMetaClient_testChildChangeListener";
+    final int count = 100;
+    try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) {
+      zkMetaClient.connect();
+      CountDownLatch countDownLatch = new CountDownLatch(count*4);
+      ChildChangeListener listener = new ChildChangeListener() {
+
+        @Override
+        public void handleChildChange(String changedPath, ChangeType changeType) throws Exception {
+          countDownLatch.countDown();
+
+        }
+      };
+      zkMetaClient.create(basePath, "");
+      Assert.assertTrue(
+          zkMetaClient.subscribeChildChanges(basePath, listener, false)
+      );
+
+      DataChangeListener dummyDataListener = new DataChangeListener() {
+        @Override
+        public void handleDataChange(String key, Object data, ChangeType changeType)
+            throws Exception {
+        }
+      };
+      try {
+        zkMetaClient.subscribeDataChange(basePath, dummyDataListener, false);
+      } catch ( Exception ex) {
+        Assert.assertEquals(ex.getClass().getName(), "java.lang.UnsupportedOperationException");

Review Comment:
   TFTR! Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] xyuanlu merged pull request #2506: ZkClient add recursive persist listener implementation

Posted by "xyuanlu (via GitHub)" <gi...@apache.org>.
xyuanlu merged PR #2506:
URL: https://github.com/apache/helix/pull/2506


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] xyuanlu commented on a diff in pull request #2506: ZkClient add recursive persist listener implementation

Posted by "xyuanlu (via GitHub)" <gi...@apache.org>.
xyuanlu commented on code in PR #2506:
URL: https://github.com/apache/helix/pull/2506#discussion_r1204988275


##########
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java:
##########
@@ -343,17 +343,48 @@ public void subscribeDataChanges(String path, IZkDataListener listener) {
    * Subscribe RecursivePersistListener for a particular path. User can only subscribe when
    * `_usePersistWatcher` is set to true and there is no pre-existing watcher on the path.
    */
-  // TODO: Add impl and remove exception
-  public boolean subscribePersistRecursiveWatcher(String path,
-      RecursivePersistListener recursivePersistListener)
-      throws KeeperException.UnimplementedException {
-    throw new KeeperException.UnimplementedException();
+  public void subscribePersistRecursiveListener(String path,
+      RecursivePersistListener recursivePersistListener) {
+    if (!_usePersistWatcher) {
+      throw new UnsupportedOperationException(
+          "Can not subscribe PersistRecursiveWatcher. Persist listener is not enabled.");
+    }
+
+    ManipulateListener addListener = () -> {
+      if (hasChildOrDataListeners(path)) {
+        throw new UnsupportedOperationException(
+            "Can not subscribe PersistRecursiveWatcher. There is an existing listener on " + path);
+      }
+      _zkPathRecursiveWatcherTrie.addRecursiveListener(path, recursivePersistListener);
+      getConnection().addWatch(path, ZkClient.this, AddWatchMode.PERSISTENT_RECURSIVE);

Review Comment:
   Good point! Will add unsubscribe. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] xyuanlu commented on a diff in pull request #2506: ZkClient add recursive persist listener implementation

Posted by "xyuanlu (via GitHub)" <gi...@apache.org>.
xyuanlu commented on code in PR #2506:
URL: https://github.com/apache/helix/pull/2506#discussion_r1204988534


##########
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java:
##########
@@ -1807,6 +1838,23 @@ private void processDataOrChildChange(WatchedEvent event, long notificationTime)
         fireDataChangedEvents(event.getPath(), listeners, OptionalLong.of(notificationTime),
             pathExists, event.getType());
       }
+
+      // fire change event for persist recursive listener
+      if (_usePersistWatcher) {
+        Set<RecursivePersistListener> recListeners =
+            _zkPathRecursiveWatcherTrie.getAllRecursiveListeners(path);
+        if (recListeners != null && !recListeners.isEmpty()) {

Review Comment:
   yes. recListeners wont be null. It is just defensive coding.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] xyuanlu commented on a diff in pull request #2506: ZkClient add recursive persist listener implementation

Posted by "xyuanlu (via GitHub)" <gi...@apache.org>.
xyuanlu commented on code in PR #2506:
URL: https://github.com/apache/helix/pull/2506#discussion_r1204987914


##########
meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/adapter/ChildListenerAdapter.java:
##########
@@ -39,7 +39,7 @@ public ChildListenerAdapter(ChildChangeListener listener) {
   private static ChildChangeListener.ChangeType convertType(Watcher.Event.EventType eventType) {
     switch (eventType) {
       case NodeCreated: return ChildChangeListener.ChangeType.ENTRY_CREATED;
-      case NodeChildrenChanged: return ChildChangeListener.ChangeType.ENTRY_DATA_CHANGE;

Review Comment:
   Yes. The event type we fire here and what helix manager hire is totally different. 
   We add APIs for persistent recursive listener in ZkClient here. We need to redesign the callback handling logic in helix manager to adopt persistent recursive listener.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] xyuanlu commented on a diff in pull request #2506: [WIP] ZkClient add recursive persist listener implementation

Posted by "xyuanlu (via GitHub)" <gi...@apache.org>.
xyuanlu commented on code in PR #2506:
URL: https://github.com/apache/helix/pull/2506#discussion_r1205056430


##########
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java:
##########
@@ -343,17 +343,48 @@ public void subscribeDataChanges(String path, IZkDataListener listener) {
    * Subscribe RecursivePersistListener for a particular path. User can only subscribe when
    * `_usePersistWatcher` is set to true and there is no pre-existing watcher on the path.
    */
-  // TODO: Add impl and remove exception
-  public boolean subscribePersistRecursiveWatcher(String path,
-      RecursivePersistListener recursivePersistListener)
-      throws KeeperException.UnimplementedException {
-    throw new KeeperException.UnimplementedException();
+  public void subscribePersistRecursiveListener(String path,
+      RecursivePersistListener recursivePersistListener) {
+    if (!_usePersistWatcher) {
+      throw new UnsupportedOperationException(
+          "Can not subscribe PersistRecursiveWatcher. Persist listener is not enabled.");
+    }
+
+    ManipulateListener addListener = () -> {
+      if (hasChildOrDataListeners(path)) {
+        throw new UnsupportedOperationException(
+            "Can not subscribe PersistRecursiveWatcher. There is an existing listener on " + path);
+      }
+      _zkPathRecursiveWatcherTrie.addRecursiveListener(path, recursivePersistListener);
+      getConnection().addWatch(path, ZkClient.this, AddWatchMode.PERSISTENT_RECURSIVE);

Review Comment:
   After some thinking. I think it would be better to move `addRecursiveListener` after `getConnection().addWatch`. If addwatcher failed, it throws exception and the listener wont be added to trie.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] qqu0127 commented on a diff in pull request #2506: ZkClient add recursive persist listener implementation

Posted by "qqu0127 (via GitHub)" <gi...@apache.org>.
qqu0127 commented on code in PR #2506:
URL: https://github.com/apache/helix/pull/2506#discussion_r1206132888


##########
meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java:
##########
@@ -397,7 +396,77 @@ public void handleDataChange(String key, Object data, ChangeType changeType)
       zkMetaClient.unsubscribeDataChange(basePath, listener);
       // verify that no listener is registered on any path
       watchers = TestUtil.getZkWatch(zkMetaClient.getZkClient());
+      Assert.assertEquals(watchers.get("persistentWatches").size(), 0);
+      Assert.assertEquals(watchers.get("childWatches").size(), 0);
+      Assert.assertEquals(watchers.get("dataWatches").size(), 0);
+
+    }
+  }
+
+  @Test
+  public void testChildChangeListener() throws Exception {
+    final String basePath = "/TestZkMetaClient_testChildChangeListener";
+    final int count = 100;
+    try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) {
+      zkMetaClient.connect();
+      CountDownLatch countDownLatch = new CountDownLatch(count*4);
+      ChildChangeListener listener = new ChildChangeListener() {
+
+        @Override
+        public void handleChildChange(String changedPath, ChangeType changeType) throws Exception {
+          countDownLatch.countDown();
+
+        }
+      };
+      zkMetaClient.create(basePath, "");
+      Assert.assertTrue(
+          zkMetaClient.subscribeChildChanges(basePath, listener, false)
+      );
+
+      DataChangeListener dummyDataListener = new DataChangeListener() {
+        @Override
+        public void handleDataChange(String key, Object data, ChangeType changeType)
+            throws Exception {
+        }
+      };
+      try {
+        zkMetaClient.subscribeDataChange(basePath, dummyDataListener, false);
+      } catch ( Exception ex) {
+        Assert.assertEquals(ex.getClass().getName(), "java.lang.UnsupportedOperationException");

Review Comment:
   Nit: you can catch only the expected exception type right? You don’t even need the catch all exception block.



##########
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/util/ZkPathRecursiveWatcherTrie.java:
##########
@@ -224,6 +223,28 @@ public void removeRecursiveListener(final String path, RecursivePersistListener
     }
   }
 
+  /**
+   * Return if there is listener on a particular path
+   * @param path
+   * @return
+   */
+  public Boolean hasListenerOnPath(String path) {

Review Comment:
   Why boxed boolean?



##########
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java:
##########
@@ -343,17 +343,54 @@ public void subscribeDataChanges(String path, IZkDataListener listener) {
    * Subscribe RecursivePersistListener for a particular path. User can only subscribe when
    * `_usePersistWatcher` is set to true and there is no pre-existing watcher on the path.
    */
-  // TODO: Add impl and remove exception
-  public boolean subscribePersistRecursiveWatcher(String path,
-      RecursivePersistListener recursivePersistListener)
-      throws KeeperException.UnimplementedException {
-    throw new KeeperException.UnimplementedException();
+  public void subscribePersistRecursiveListener(String path,
+      RecursivePersistListener recursivePersistListener) {
+    if (!_usePersistWatcher) {
+      throw new UnsupportedOperationException(
+          "Can not subscribe PersistRecursiveWatcher. Persist listener is not enabled.");
+    }
+
+    ManipulateListener addListener = () -> {
+      if (hasChildOrDataListeners(path)) {
+        throw new UnsupportedOperationException(
+            "Can not subscribe PersistRecursiveWatcher. There is an existing listener on " + path);
+      }
+      // subscribe a PERSISTENT_RECURSIVE listener on path. It throws exception if not successful
+      retryUntilConnected(() -> {
+        getConnection().addWatch(path, ZkClient.this, AddWatchMode.PERSISTENT_RECURSIVE);
+        return null;
+      });
+
+      _zkPathRecursiveWatcherTrie.addRecursiveListener(path, recursivePersistListener);
+    };
+
+    executeWithInPersistListenerMutex(addListener);
   }
 
-  public boolean unsubscribePersistRecursiveWatcher(String path,
-      RecursivePersistListener recursivePersistListener)
-      throws KeeperException.UnimplementedException {
-    throw new KeeperException.UnimplementedException();
+  public void unsubscribePersistRecursiveListener(String path,
+      RecursivePersistListener recursivePersistListener) {
+    if (!_usePersistWatcher) {
+      throw new UnsupportedOperationException(
+          "Can not subscribe PersistRecursiveWatcher. Persist listener is not enabled.");
+    }
+
+    // unsubscribe from ZK if this is the only recursive persist listener on this path.
+    ManipulateListener removeListeners = () -> {
+      _zkPathRecursiveWatcherTrie.removeRecursiveListener(path, recursivePersistListener);
+      if (_zkPathRecursiveWatcherTrie.hasListenerOnPath(path)) {
+        return;
+      }
+      try {
+        // We are not checking if there is a persist listener registered on the path because
+        // we do not allow subscribe a persist listener on the same path of persist recursive
+        // listener as of now.
+        getConnection().removeWatches(path, this, WatcherType.Any);
+      } catch (KeeperException.NoWatcherException e) {
+        LOG.warn("Persist watcher is already removed");

Review Comment:
   Nit: let’s add some more information like path and exception, otherwise the log is hard to interpret.



##########
meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java:
##########
@@ -332,15 +333,18 @@ public boolean subscribeStateChanges(ConnectStateChangeListener listener) {
     return true;
   }
 
-  // TODO: add impl and remove UnimplementedException
   @Override
   public boolean subscribeChildChanges(String key, ChildChangeListener listener, boolean skipWatchingNonExistNode) {
+    if (skipWatchingNonExistNode && exists(key) == null) {
+      return false;
+    }
     try {
-      _zkClient.subscribePersistRecursiveWatcher(key, new ChildListenerAdapter(listener));
-    } catch (KeeperException.UnimplementedException e) {
-      LOG.error(e.getLocalizedMessage());
+      _zkClient.subscribePersistRecursiveListener(key, new ChildListenerAdapter(listener));
+    } catch (ZkException ex) {
+      LOG.error("Failed to subscribe ChildChanges for path: " + key, ex);

Review Comment:
   Open question, what’s the idea behind catching the exception vs failing out? It seems for subscribe method, we catch zk exception, but for unsubscribe, we don’t. 



##########
meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java:
##########
@@ -397,7 +396,77 @@ public void handleDataChange(String key, Object data, ChangeType changeType)
       zkMetaClient.unsubscribeDataChange(basePath, listener);
       // verify that no listener is registered on any path
       watchers = TestUtil.getZkWatch(zkMetaClient.getZkClient());
+      Assert.assertEquals(watchers.get("persistentWatches").size(), 0);
+      Assert.assertEquals(watchers.get("childWatches").size(), 0);
+      Assert.assertEquals(watchers.get("dataWatches").size(), 0);
+
+    }
+  }
+
+  @Test
+  public void testChildChangeListener() throws Exception {
+    final String basePath = "/TestZkMetaClient_testChildChangeListener";
+    final int count = 100;
+    try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) {
+      zkMetaClient.connect();
+      CountDownLatch countDownLatch = new CountDownLatch(count*4);
+      ChildChangeListener listener = new ChildChangeListener() {
+
+        @Override
+        public void handleChildChange(String changedPath, ChangeType changeType) throws Exception {
+          countDownLatch.countDown();
+
+        }
+      };
+      zkMetaClient.create(basePath, "");
+      Assert.assertTrue(
+          zkMetaClient.subscribeChildChanges(basePath, listener, false)
+      );
+
+      DataChangeListener dummyDataListener = new DataChangeListener() {
+        @Override
+        public void handleDataChange(String key, Object data, ChangeType changeType)
+            throws Exception {
+        }
+      };
+      try {
+        zkMetaClient.subscribeDataChange(basePath, dummyDataListener, false);
+      } catch ( Exception ex) {
+        Assert.assertEquals(ex.getClass().getName(), "java.lang.UnsupportedOperationException");
+      }
+
+      DirectChildChangeListener dummyCldListener = new DirectChildChangeListener() {
+        @Override
+        public void handleDirectChildChange(String key) throws Exception {
+
+        }
+      };
+      try {
+        zkMetaClient.subscribeDirectChildChange(basePath, dummyCldListener, false);
+      } catch ( Exception ex) {
+        Assert.assertEquals(ex.getClass().getName(), "java.lang.UnsupportedOperationException");
+      }
+
+      // Verify no one time watcher is registered. Only one persist listener is registered.
+      Map<String, List<String>> watchers = TestUtil.getZkWatch(zkMetaClient.getZkClient());
+      Assert.assertEquals(watchers.get("persistentRecursiveWatches").size(), 1);
+      Assert.assertEquals(watchers.get("persistentRecursiveWatches").get(0), basePath);
+      Assert.assertEquals(watchers.get("persistentWatches").size(), 0);
+      Assert.assertEquals(watchers.get("childWatches").size(), 0);
+      Assert.assertEquals(watchers.get("dataWatches").size(), 0);
+
+      for (int i=0; i<count; ++i) {
+        zkMetaClient.set(basePath, "data7" + i, -1);
+        zkMetaClient.create(basePath+"/c1_" +i , "datat");
+        zkMetaClient.create(basePath+"/c1_" +i + "/c2", "datat");
+        zkMetaClient.delete(basePath+"/c1_" +i + "/c2");
+      }
+      Assert.assertTrue(countDownLatch.await(50000000, TimeUnit.MILLISECONDS));

Review Comment:
   Is the timeout too long?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] xyuanlu commented on a diff in pull request #2506: ZkClient add recursive persist listener implementation

Posted by "xyuanlu (via GitHub)" <gi...@apache.org>.
xyuanlu commented on code in PR #2506:
URL: https://github.com/apache/helix/pull/2506#discussion_r1212106696


##########
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java:
##########
@@ -343,17 +343,54 @@ public void subscribeDataChanges(String path, IZkDataListener listener) {
    * Subscribe RecursivePersistListener for a particular path. User can only subscribe when
    * `_usePersistWatcher` is set to true and there is no pre-existing watcher on the path.
    */
-  // TODO: Add impl and remove exception
-  public boolean subscribePersistRecursiveWatcher(String path,
-      RecursivePersistListener recursivePersistListener)
-      throws KeeperException.UnimplementedException {
-    throw new KeeperException.UnimplementedException();
+  public void subscribePersistRecursiveListener(String path,
+      RecursivePersistListener recursivePersistListener) {
+    if (!_usePersistWatcher) {

Review Comment:
   TFTR! 
   We do return `false` in ZkMetaclient when it is in false path (try to subscribe a non exist path and flag for "skipWatchingNonExistNode" is true). Meaning there is no unexpected behavior.
    When `subscribePersistRecursiveListener ` throws exception, it means native ZK is unable to process, witch is categorized as an exception, so we throw it to users. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] xyuanlu commented on a diff in pull request #2506: ZkClient add recursive persist listener implementation

Posted by "xyuanlu (via GitHub)" <gi...@apache.org>.
xyuanlu commented on code in PR #2506:
URL: https://github.com/apache/helix/pull/2506#discussion_r1212416105


##########
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java:
##########
@@ -343,17 +343,54 @@ public void subscribeDataChanges(String path, IZkDataListener listener) {
    * Subscribe RecursivePersistListener for a particular path. User can only subscribe when
    * `_usePersistWatcher` is set to true and there is no pre-existing watcher on the path.
    */
-  // TODO: Add impl and remove exception
-  public boolean subscribePersistRecursiveWatcher(String path,
-      RecursivePersistListener recursivePersistListener)
-      throws KeeperException.UnimplementedException {
-    throw new KeeperException.UnimplementedException();
+  public void subscribePersistRecursiveListener(String path,
+      RecursivePersistListener recursivePersistListener) {
+    if (!_usePersistWatcher) {
+      throw new UnsupportedOperationException(
+          "Can not subscribe PersistRecursiveWatcher. Persist listener is not enabled.");
+    }
+
+    ManipulateListener addListener = () -> {
+      if (hasChildOrDataListeners(path)) {
+        throw new UnsupportedOperationException(
+            "Can not subscribe PersistRecursiveWatcher. There is an existing listener on " + path);
+      }
+      // subscribe a PERSISTENT_RECURSIVE listener on path. It throws exception if not successful
+      retryUntilConnected(() -> {
+        getConnection().addWatch(path, ZkClient.this, AddWatchMode.PERSISTENT_RECURSIVE);
+        return null;

Review Comment:
   retryUntilConnected expect a T type for return.  So we need to return something to cast to type T.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] xyuanlu commented on pull request #2506: ZkClient add recursive persist listener implementation

Posted by "xyuanlu (via GitHub)" <gi...@apache.org>.
xyuanlu commented on PR #2506:
URL: https://github.com/apache/helix/pull/2506#issuecomment-1571484839

   This change is ready to be merged. Approved by @desaikomal  // Thanks!
   commit message: ZkClient add recursive persist listener implementation


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] xyuanlu commented on a diff in pull request #2506: ZkClient add recursive persist listener implementation

Posted by "xyuanlu (via GitHub)" <gi...@apache.org>.
xyuanlu commented on code in PR #2506:
URL: https://github.com/apache/helix/pull/2506#discussion_r1204986484


##########
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java:
##########
@@ -1807,6 +1838,23 @@ private void processDataOrChildChange(WatchedEvent event, long notificationTime)
         fireDataChangedEvents(event.getPath(), listeners, OptionalLong.of(notificationTime),
             pathExists, event.getType());
       }
+
+      // fire change event for persist recursive listener
+      if (_usePersistWatcher) {
+        Set<RecursivePersistListener> recListeners =
+            _zkPathRecursiveWatcherTrie.getAllRecursiveListeners(path);
+        if (recListeners != null && !recListeners.isEmpty()) {
+          for (final RecursivePersistListener listener : recListeners) {

Review Comment:
   I am not sure about "zk event threads". As the event thread executor is a single thread executor. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] xyuanlu commented on a diff in pull request #2506: ZkClient add recursive persist listener implementation

Posted by "xyuanlu (via GitHub)" <gi...@apache.org>.
xyuanlu commented on code in PR #2506:
URL: https://github.com/apache/helix/pull/2506#discussion_r1204990993


##########
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java:
##########
@@ -3030,8 +3078,16 @@ interface ManipulateListener {
     void run() throws KeeperException, InterruptedException;
   }
 
+  // Add a persist listener on the path.
+  // Throws UnsupportedOperationException if there is already a recursive persist listener on the
+  // path because it will overwrite that recursive persist listener.
   private void addPersistListener(String path, Object listener) {
     ManipulateListener addListeners = () -> {
+      if (_zkPathRecursiveWatcherTrie.hasListenerOnPath(path)) {

Review Comment:
   Sorry I didn't quit follow. We check if there is a  persistent recursive listener on the path when 
   1. unsubscribe persistent recursive listener
   2. here 
   Did I miss anything?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] xyuanlu commented on a diff in pull request #2506: ZkClient add recursive persist listener implementation

Posted by "xyuanlu (via GitHub)" <gi...@apache.org>.
xyuanlu commented on code in PR #2506:
URL: https://github.com/apache/helix/pull/2506#discussion_r1207540203


##########
meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java:
##########
@@ -332,15 +333,18 @@ public boolean subscribeStateChanges(ConnectStateChangeListener listener) {
     return true;
   }
 
-  // TODO: add impl and remove UnimplementedException
   @Override
   public boolean subscribeChildChanges(String key, ChildChangeListener listener, boolean skipWatchingNonExistNode) {
+    if (skipWatchingNonExistNode && exists(key) == null) {
+      return false;
+    }
     try {
-      _zkClient.subscribePersistRecursiveWatcher(key, new ChildListenerAdapter(listener));
-    } catch (KeeperException.UnimplementedException e) {
-      LOG.error(e.getLocalizedMessage());
+      _zkClient.subscribePersistRecursiveListener(key, new ChildListenerAdapter(listener));
+    } catch (ZkException ex) {
+      LOG.error("Failed to subscribe ChildChanges for path: " + key, ex);

Review Comment:
   Good point! 
   I updated the function. It will return false when it is expected to fail (when path does not exists and 'skipWatchingNonExistNode' is true). It throws exception when failure is not expected.
    



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org