You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by xy...@apache.org on 2023/04/27 20:36:29 UTC

[helix] 20/37: Create separate API for persistent and one-time listener (#2376)

This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch metaclient
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 5ed14aefceea8064fbe369b1b200495fe9f2de9d
Author: Qi (Quincy) Qu <qq...@linkedin.com>
AuthorDate: Wed Feb 15 14:20:34 2023 -0500

    Create separate API for persistent and one-time listener (#2376)
    
    Create separate API for persistent and one-time listener
---
 .../helix/metaclient/api/MetaClientInterface.java  | 75 +++++++++++++++++-----
 .../helix/metaclient/impl/zk/ZkMetaClient.java     | 18 ++----
 .../helix/metaclient/impl/zk/TestZkMetaClient.java |  8 +--
 3 files changed, 68 insertions(+), 33 deletions(-)

diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java b/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java
index dc310b0ef..a4c5113f2 100644
--- a/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java
@@ -22,6 +22,7 @@ package org.apache.helix.metaclient.api;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.lang3.NotImplementedException;
 import org.apache.helix.metaclient.exception.MetaClientInterruptException;
 import org.apache.helix.metaclient.exception.MetaClientTimeoutException;
 
@@ -378,61 +379,105 @@ public interface MetaClientInterface<T> {
   /**
    * Subscribe change of a particular entry. Including entry data change, entry deletion and creation
    * of the given key.
+   * The listener should be permanent until it's unsubscribed.
    * @param key Key to identify the entry
    * @param listener An implementation of DataChangeListener
    *                 @see org.apache.helix.metaclient.api.DataChangeListener
    * @param skipWatchingNonExistNode Will not register lister to an non-exist key if set to true.
    *                                 Please set to false if you are expecting ENTRY_CREATED type.
-   * @param persistListener The listener will persist when set to true. Otherwise it will be a one
-   *                        time triggered listener.
    * @return Return an boolean indication if subscribe succeeded.
    */
-  boolean subscribeDataChange(String key, DataChangeListener listener,
-       boolean skipWatchingNonExistNode, boolean persistListener);
+  boolean subscribeDataChange(String key, DataChangeListener listener, boolean skipWatchingNonExistNode);
+
+  /**
+   * Subscribe a one-time change of a particular entry. Including entry data change, entry deletion and creation
+   * of the given key.
+   * The implementation should use at-most-once delivery semantic.
+   * @param key Key to identify the entry
+   * @param listener An implementation of DataChangeListener
+   *                 @see org.apache.helix.metaclient.api.DataChangeListener
+   * @param skipWatchingNonExistNode Will not register lister to an non-exist key if set to true.
+   *                                 Please set to false if you are expecting ENTRY_CREATED type.
+   * @return Return an boolean indication if subscribe succeeded.
+   */
+  default boolean subscribeOneTimeDataChange(String key, DataChangeListener listener,
+      boolean skipWatchingNonExistNode) {
+    throw new NotImplementedException("subscribeOneTimeDataChange is not implemented");
+  }
 
   /**
    * Subscribe for direct child change event on a particular key. It includes new child
    * creation or deletion. It does not include existing child data change.
+   * The listener should be permanent until it's unsubscribed.
    * For hierarchy key spaces like zookeeper, it refers to an entry's direct children nodes.
    * For flat key spaces, it refers to keys that matches `prefix*separator`.
    * @param key key to identify the entry.
    * @param listener An implementation of DirectSubEntryChangeListener.
    *                 @see org.apache.helix.metaclient.api.DirectChildChangeListener
    * @param skipWatchingNonExistNode If the passed in key does not exist, no listener wil be registered.
-   * @param persistListener The listener will persist when set to true. Otherwise it will be a one
-   *                        time triggered listener.
    *
    * @return Return an DirectSubEntrySubscribeResult. It will contain a list of direct sub children if
    *         subscribe succeeded.
    */
   DirectChildSubscribeResult subscribeDirectChildChange(String key,
-      DirectChildChangeListener listener, boolean skipWatchingNonExistNode,
-      boolean persistListener);
+      DirectChildChangeListener listener, boolean skipWatchingNonExistNode);
+
+  /**
+   * Subscribe for a one-time direct child change event on a particular key. It includes new child
+   * creation or deletion. It does not include existing child data change.
+   * The implementation should use at-most-once delivery semantic.
+   * For hierarchy key spaces like zookeeper, it refers to an entry's direct children nodes.
+   * For flat key spaces, it refers to keys that matches `prefix*separator`.
+   *
+   * @param key key to identify the entry.
+   * @param listener An implementation of DirectSubEntryChangeListener.
+   *                 @see org.apache.helix.metaclient.api.DirectChildChangeListener
+   * @param skipWatchingNonExistNode If the passed in key does not exist, no listener wil be registered.
+   *
+   * @return Return an DirectSubEntrySubscribeResult. It will contain a list of direct sub children if
+   *         subscribe succeeded.
+   */
+  default DirectChildSubscribeResult subscribeOneTimeDirectChildChange(String key,
+      DirectChildChangeListener listener, boolean skipWatchingNonExistNode) {
+    throw new NotImplementedException("subscribeOneTimeDirectChildChange is not implemented");
+  }
 
   /**
    * Subscribe for connection state change.
+   * The listener should be permanent until it's unsubscribed.
    * @param listener An implementation of ConnectStateChangeListener.
    *                 @see org.apache.helix.metaclient.api.ConnectStateChangeListener
-   * @param persistListener The listener will persist when set to true. Otherwise it will be a one
-   *                        time triggered listener.
    *
    * @return Return an boolean indication if subscribe succeeded.
    */
-  boolean subscribeStateChanges(ConnectStateChangeListener listener, boolean persistListener);
+  boolean subscribeStateChanges(ConnectStateChangeListener listener);
 
   /**
    * Subscribe change for all children including entry change and data change.
+   * The listener should be permanent until it's unsubscribed.
+   * For hierarchy key spaces like zookeeper, it would watch the whole tree structure.
+   * For flat key spaces, it would watch for keys with certain prefix.
+   * @param key key to identify the entry.
+   * @param listener An implementation of ChildChangeListener.
+   *                 @see org.apache.helix.metaclient.api.ChildChangeListener
+   * @param skipWatchingNonExistNode If the passed in key does not exist, no listener wil be registered.
+   */
+  boolean subscribeChildChanges(String key, ChildChangeListener listener, boolean skipWatchingNonExistNode);
+
+  /**
+   * Subscribe a one-time change for all children including entry change and data change.
+   * The implementation should use at-most-once delivery semantic.
    * For hierarchy key spaces like zookeeper, it would watch the whole tree structure.
    * For flat key spaces, it would watch for keys with certain prefix.
    * @param key key to identify the entry.
    * @param listener An implementation of ChildChangeListener.
    *                 @see org.apache.helix.metaclient.api.ChildChangeListener
    * @param skipWatchingNonExistNode If the passed in key does not exist, no listener wil be registered.
-   * @param persistListener The listener will persist when set to true. Otherwise it will be a one
-   *                        time triggered listener.
    */
-  boolean subscribeChildChanges(String key, ChildChangeListener listener,
-      boolean skipWatchingNonExistNode, boolean persistListener);
+  default boolean subscribeOneTimeChildChanges(String key, ChildChangeListener listener,
+      boolean skipWatchingNonExistNode) {
+    throw new NotImplementedException("subscribeOneTimeChildChanges is not implemented");
+  }
 
   /**
    * Unsubscribe the listener to further changes. No-op if the listener is not subscribed to the key.
diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
index fea40bcde..e042368d3 100644
--- a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
@@ -253,36 +253,26 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable {
   }
 
   @Override
-  public boolean subscribeDataChange(String key, DataChangeListener listener,
-      boolean skipWatchingNonExistNode, boolean persistListener) {
-    if (!persistListener) {
-      throw new NotImplementedException("Currently the non-persist (one-time) listener is not supported in ZkMetaClient.");
-    }
+  public boolean subscribeDataChange(String key, DataChangeListener listener, boolean skipWatchingNonExistNode) {
     _zkClient.subscribeDataChanges(key, new DataListenerAdapter(listener));
     return false;
   }
 
   @Override
   public DirectChildSubscribeResult subscribeDirectChildChange(String key,
-      DirectChildChangeListener listener, boolean skipWatchingNonExistNode,
-      boolean persistListener) {
-    if (!persistListener) {
-      throw new NotImplementedException("Currently the non-persist (one-time) listener is not supported in ZkMetaClient.");
-    }
+      DirectChildChangeListener listener, boolean skipWatchingNonExistNode) {
     ChildrenSubscribeResult result =
         _zkClient.subscribeChildChanges(key, new DirectChildListenerAdapter(listener), skipWatchingNonExistNode);
     return new DirectChildSubscribeResult(result.getChildren(), result.isInstalled());
   }
 
   @Override
-  public boolean subscribeStateChanges(ConnectStateChangeListener listener,
-      boolean persistListener) {
+  public boolean subscribeStateChanges(ConnectStateChangeListener listener) {
     return false;
   }
 
   @Override
-  public boolean subscribeChildChanges(String key, ChildChangeListener listener,
-      boolean skipWatchingNonExistNode, boolean persistListener) {
+  public boolean subscribeChildChanges(String key, ChildChangeListener listener, boolean skipWatchingNonExistNode) {
     return false;
   }
 
diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
index 76ac4b0b3..6cccfae72 100644
--- a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
+++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
@@ -222,7 +222,7 @@ public class TestZkMetaClient {
     try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) {
       zkMetaClient.connect();
       MockDataChangeListener listener = new MockDataChangeListener();
-      zkMetaClient.subscribeDataChange(path, listener, false, true);
+      zkMetaClient.subscribeDataChange(path, listener, false);
       zkMetaClient.create(path, "test-node");
       int expectedCallCount = 0;
       synchronized (_syncObject) {
@@ -260,7 +260,7 @@ public class TestZkMetaClient {
       }
       // register a new non-persistent listener
       try {
-        zkMetaClient.subscribeDataChange(path, new MockDataChangeListener(), false, false);
+        zkMetaClient.subscribeOneTimeDataChange(path, new MockDataChangeListener(), false);
         Assert.fail("One-time listener is not supported, NotImplementedException should be thrown.");
       } catch (NotImplementedException e) {
         // expected
@@ -293,7 +293,7 @@ public class TestZkMetaClient {
             }
           };
           listeners.get(path).add(listener);
-          zkMetaClient.subscribeDataChange(path, listener, false, true);
+          zkMetaClient.subscribeDataChange(path, listener, false);
         }
       }
       zkMetaClient.set(basePath + "_1", testData, -1);
@@ -317,7 +317,7 @@ public class TestZkMetaClient {
       };
       zkMetaClient.create(basePath, "");
       Assert.assertTrue(
-          zkMetaClient.subscribeDirectChildChange(basePath, listener, false, true)
+          zkMetaClient.subscribeDirectChildChange(basePath, listener, false)
               .isRegistered());
       zkMetaClient.create(basePath + "/child_1", "test-data");
       //TODO: the native zkclient failed to provide persistent listener, and event might be lost.