You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@helix.apache.org by GitBox <gi...@apache.org> on 2023/01/10 22:26:11 UTC

[GitHub] [helix] qqu0127 opened a new pull request, #2338: Implement a thread-safe listener container for zkclient

qqu0127 opened a new pull request, #2338:
URL: https://github.com/apache/helix/pull/2338

   ### Issues
   
   - [ ] My PR addresses the following Helix issues and references them in the PR description:
   N.A.
   
   ### Description
   
   - [X] Here are some details about my PR, including screenshots of any UI changes:
   Implement a thread-safe listener container for zkclient listener management.
   This is introduced to better support metaclient implementation, where:
   1. New functionalities added with the introduction of "one-time" listener in metaclient (now in zkclient, all listeners are "persistent")
   2. Better encapsulation and code refactoring
   
   This PR added the main class `ListenerContainer` that contains the core functionalities.
   The next PR will be have zkclient adopt this new component, then implement the new funcs for one-time listener.
   
   ### Tests
   
   - [X] The following tests are written for this issue:
   
   `TestListenerContainer`
   
   - The following is the result of the "mvn test" command on the appropriate module:
   [INFO] Tests run: 78, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 1,115.267 s - in TestSuite
   [INFO] 
   [INFO] Results:
   [INFO] 
   [INFO] Tests run: 78, Failures: 0, Errors: 0, Skipped: 0
   [INFO] 
   [INFO] 
   [INFO] --- jacoco-maven-plugin:0.8.6:report (generate-code-coverage-report) @ zookeeper-api ---
   [INFO] Loading execution data file /Users/qqu/workspace/qqu-helix/zookeeper-api/target/jacoco.exec
   [INFO] Analyzed bundle 'Apache Helix :: ZooKeeper API' with 116 classes
   [INFO] ------------------------------------------------------------------------
   [INFO] BUILD SUCCESS
   [INFO] ------------------------------------------------------------------------
   [INFO] Total time:  18:50 min
   [INFO] Finished at: 2023-01-10T14:48:02-05:00
   [INFO] ------------------------------------------------------------------------
   
   
   ### Changes that Break Backward Compatibility (Optional)
   
   - My PR contains changes that break backward compatibility or previous assumptions for certain methods or API. They include:
   
   (Consider including all behavior changes for public methods or API. Also include these changes in merge description so that other developers are aware of these changes. This allows them to make relevant code changes in feature branches accounting for the new method/API behavior.)
   
   ### Documentation (Optional)
   
   - In case of new functionality, my PR adds documentation in the following wiki page:
   
   (Link the GitHub wiki you added)
   
   ### Commits
   
   - My commits all reference appropriate Apache Helix GitHub issues in their subject lines. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)":
     1. Subject is separated from body by a blank line
     1. Subject is limited to 50 characters (not including Jira issue reference)
     1. Subject does not end with a period
     1. Subject uses the imperative mood ("add", not "adding")
     1. Body wraps at 72 characters
     1. Body explains "what" and "why", not "how"
   
   ### Code Quality
   
   - My diff has been formatted using helix-style.xml 
   (helix-style-intellij.xml if IntelliJ IDE is used)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] desaikomal commented on pull request #2338: Implement a thread-safe listener container for zkclient

Posted by "desaikomal (via GitHub)" <gi...@apache.org>.
desaikomal commented on PR #2338:
URL: https://github.com/apache/helix/pull/2338#issuecomment-1539384652

   @qqu0127 - is this PR still relevant - if not, can  we please close it? thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] jiajunwang commented on a diff in pull request #2338: Implement a thread-safe listener container for zkclient

Posted by GitBox <gi...@apache.org>.
jiajunwang commented on code in PR #2338:
URL: https://github.com/apache/helix/pull/2338#discussion_r1068951156


##########
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ListenerContainer.java:
##########
@@ -0,0 +1,186 @@
+package org.apache.helix.zookeeper.zkclient;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+
+
+/**
+ * A thread-safe container wrapper class for Listeners registered to ZkClient.
+ * It stores 2 types of listener separately, one-time listener and persistent listener. The former ones are removed
+ * right after its consumption, while the latter need to be explicitly removed. 
+ * @param <T> the type of listener
+ */
+public class ListenerContainer<T> {
+  private final ReentrantLock _lock = new ReentrantLock(true);
+  private final Map<String, Set<T>> _persistentListener = new ConcurrentHashMap<>();
+  private final Map<String, Set<T>> _onetimeListener = new ConcurrentHashMap<>();
+
+  /**
+   * Add listener to the container with specified key.
+   * @param key the key to register to
+   * @param listener the listener to register
+   * @param persistListener true if the listener is persistent
+   */
+  public void addListener(String key, T listener, boolean persistListener) {
+    addListener(key, listener, persistListener ? _persistentListener : _onetimeListener);
+  }
+
+  /**
+   * Remove listener from the container.
+   * This operation removes both one-time and persistent listener from the specified key.
+   * @param key the key to remove
+   * @param listener the listener to remove
+   */
+  public void removeListener(String key, T listener) {
+    _lock.lock();
+    try {
+      removeFromListenerMap(key, listener, _persistentListener);

Review Comment:
   I think my knowledge must be out-of-date. But still 2 questions,
   1. Why not design the listener struture to enforce some basic metadata like type. So the type can be part of the listener object? In this case, you don't need to blindly try to remove it from all possible lists (imaging we have 3, 4, to n types in the future...).
   2. Why you need to lock all the places? It seems to be simpler just adding sync the methods if it is really necessary.



##########
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ListenerContainer.java:
##########
@@ -0,0 +1,186 @@
+package org.apache.helix.zookeeper.zkclient;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+
+
+/**
+ * A thread-safe container wrapper class for Listeners registered to ZkClient.
+ * It stores 2 types of listener separately, one-time listener and persistent listener. The former ones are removed
+ * right after its consumption, while the latter need to be explicitly removed. 
+ * @param <T> the type of listener
+ */
+public class ListenerContainer<T> {
+  private final ReentrantLock _lock = new ReentrantLock(true);
+  private final Map<String, Set<T>> _persistentListener = new ConcurrentHashMap<>();
+  private final Map<String, Set<T>> _onetimeListener = new ConcurrentHashMap<>();
+
+  /**
+   * Add listener to the container with specified key.
+   * @param key the key to register to
+   * @param listener the listener to register
+   * @param persistListener true if the listener is persistent
+   */
+  public void addListener(String key, T listener, boolean persistListener) {
+    addListener(key, listener, persistListener ? _persistentListener : _onetimeListener);
+  }
+
+  /**
+   * Remove listener from the container.
+   * This operation removes both one-time and persistent listener from the specified key.
+   * @param key the key to remove
+   * @param listener the listener to remove
+   */
+  public void removeListener(String key, T listener) {
+    _lock.lock();
+    try {
+      removeFromListenerMap(key, listener, _persistentListener);
+      removeFromListenerMap(key, listener, _onetimeListener);
+    } finally {
+      _lock.unlock();
+    }
+  }
+
+  /**
+   * Remove listener from the container.
+   * @param key the key to remove
+   * @param listener the listener to remove
+   * @param persistListener true if remove the persistent listener, otherwise remove the one-time listener
+   */
+  public void removeListener(String key, T listener, boolean persistListener) {

Review Comment:
   Add to my previous point, I think it is really ugly to let user pass the type after they pass the type when adding and removing the listener. Once it is added, the container class should have enough information to tell if it is persist or not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] qqu0127 commented on a diff in pull request #2338: Implement a thread-safe listener container for zkclient

Posted by "qqu0127 (via GitHub)" <gi...@apache.org>.
qqu0127 commented on code in PR #2338:
URL: https://github.com/apache/helix/pull/2338#discussion_r1084659594


##########
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ListenerContainer.java:
##########
@@ -0,0 +1,161 @@
+package org.apache.helix.zookeeper.zkclient;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.function.Consumer;
+
+
+/**
+ * A thread-safe container wrapper class for Listeners registered to ZkClient.
+ * It stores 2 types of listener separately, one-time listener and persistent listener. The former ones are removed
+ * right after its consumption, while the latter need to be explicitly removed.
+ * @param <T> the type of listener
+ */
+public class ListenerContainer<T> {
+  private final Map<String, Set<T>> _persistentListener = new ConcurrentHashMap<>();
+  private final Map<String, Set<T>> _onetimeListener = new ConcurrentHashMap<>();
+
+  /**
+   * Add listener to the container with specified key.
+   * @param key the key to register to
+   * @param listener the listener to register
+   * @param persistListener true if the listener is persistent
+   */
+  public void addListener(String key, T listener, boolean persistListener) {
+    addListener(key, listener, persistListener ? _persistentListener : _onetimeListener);
+  }
+
+  /**
+   * Remove listener from the container.
+   * This operation removes both one-time and persistent listener from the specified key.
+   * @param key the key to remove
+   * @param listener the listener to remove
+   */
+  public synchronized void removeListener(String key, T listener) {
+    removeFromListenerMap(key, listener, _persistentListener);
+    removeFromListenerMap(key, listener, _onetimeListener);
+  }
+
+  /**
+   * Remove listener from the container.
+   * @param key the key to remove
+   * @param listener the listener to remove
+   * @param persistListener true if remove the persistent listener, otherwise remove the one-time listener
+   */
+  public synchronized void removeListener(String key, T listener, boolean persistListener) {
+    if (persistListener) {
+      removeFromListenerMap(key, listener, _persistentListener);
+    } else {
+      removeFromListenerMap(key, listener, _onetimeListener);
+    }
+  }
+
+  /**
+   * Remove listeners from the container.
+   * @param key the key to remove
+   * @param listeners the listeners to remove
+   * @param persistListener true if remove the persistent listener, otherwise remove the one-time listener
+   */
+  public synchronized void removeListeners(String key, Collection<T> listeners, boolean persistListener) {
+    if (persistListener) {
+      removeFromListenerMap(key, listeners, _persistentListener);
+    } else {
+      removeFromListenerMap(key, listeners, _onetimeListener);
+    }
+  }
+
+  /**
+   * Consume the listeners registered to the given key. The given consumer is applied to all one-time and persistent
+   * listeners with the given key.
+   * The one-time listeners are removed after this operation, as a result, this operation is NOT idempotent.
+   * @param key the key of the listeners
+   * @param consumer the consuming action for the listeners
+   */
+  public void consumeListeners(String key, Consumer<? super T> consumer) {
+    Set<T> onetimeListeners;
+    Set<T> persistentListeners;
+    synchronized (this) {
+      // remove one-time listeners
+      onetimeListeners = _onetimeListener.remove(key);
+    }
+    if (onetimeListeners == null) {

Review Comment:
   Thanks for the review. 
   To my understanding, the reference to `Collections.emptySet()` won't instantiate any new instance, so there is no overhead. I think this is also good for readability and unify the control logic with non-null values. 
   Let me know if you have other thoughts on this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] Marcosrico commented on a diff in pull request #2338: Implement a thread-safe listener container for zkclient

Posted by GitBox <gi...@apache.org>.
Marcosrico commented on code in PR #2338:
URL: https://github.com/apache/helix/pull/2338#discussion_r1068357047


##########
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ListenerContainer.java:
##########
@@ -0,0 +1,186 @@
+package org.apache.helix.zookeeper.zkclient;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+
+
+/**
+ * A thread-safe container wrapper class for Listeners registered to ZkClient.
+ * It stores 2 types of listener separately, one-time listener and persistent listener. The former ones are removed
+ * right after its consumption, while the latter need to be explicitly removed. 
+ * @param <T> the type of listener
+ */
+public class ListenerContainer<T> {
+  private final ReentrantLock _lock = new ReentrantLock(true);
+  private final Map<String, Set<T>> _persistentListener = new ConcurrentHashMap<>();
+  private final Map<String, Set<T>> _onetimeListener = new ConcurrentHashMap<>();
+
+  /**
+   * Add listener to the container with specified key.
+   * @param key the key to register to
+   * @param listener the listener to register
+   * @param persistListener true if the listener is persistent
+   */
+  public void addListener(String key, T listener, boolean persistListener) {
+    addListener(key, listener, persistListener ? _persistentListener : _onetimeListener);
+  }
+
+  /**
+   * Remove listener from the container.
+   * This operation removes both one-time and persistent listener from the specified key.
+   * @param key the key to remove
+   * @param listener the listener to remove
+   */
+  public void removeListener(String key, T listener) {
+    _lock.lock();
+    try {
+      removeFromListenerMap(key, listener, _persistentListener);
+      removeFromListenerMap(key, listener, _onetimeListener);
+    } finally {
+      _lock.unlock();
+    }
+  }
+
+  /**
+   * Remove listener from the container.
+   * @param key the key to remove
+   * @param listener the listener to remove
+   * @param persistListener true if remove the persistent listener, otherwise remove the one-time listener
+   */
+  public void removeListener(String key, T listener, boolean persistListener) {
+    _lock.lock();
+    try {
+      if (persistListener) {
+        removeFromListenerMap(key, listener, _persistentListener);
+      } else {
+        removeFromListenerMap(key, listener, _onetimeListener);
+      }
+    } finally {
+      _lock.unlock();
+    }
+  }
+
+  /**
+   * Remove listeners from the container.
+   * @param key the key to remove
+   * @param listeners the listeners to remove
+   * @param persistListener true if remove the persistent listener, otherwise remove the one-time listener
+   */
+  public void removeListeners(String key, Collection<T> listeners, boolean persistListener) {
+    _lock.lock();
+    try {
+      if (persistListener) {
+        removeFromListenerMap(key, listeners, _persistentListener);
+      } else {
+        removeFromListenerMap(key, listeners, _onetimeListener);
+      }
+    } finally {
+      _lock.unlock();
+    }
+  }
+
+  /**
+   * Consume the listeners registered to the given key. The given consumer is applied to all one-time and persistent
+   * listeners with the given key.
+   * The one-time listeners are removed after this operation, as a result, this operation is NOT idempotent.
+   * @param key the key of the listeners
+   * @param consumer the consuming action for the listeners
+   */
+  public void consumeListeners(String key, Consumer<? super T> consumer) {
+    Set<T> onetimeListeners;
+    Set<T> persistentListeners;
+    _lock.lock();
+    try {
+      // remove one-time listeners
+      onetimeListeners = _onetimeListener.remove(key);

Review Comment:
   Please correct me if I'm wrong but I believe the remove method in ConcurrentHashMap are atomic in itself so not sure if it is necessary to lock beforehand.
   
   Also should we check if key is null? Given underlying data structure of a ConcurrentHashMap is a HashTable, attempting to remove a null key would throw a NullPointerException which might not be the desired effect. Just a thought



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] jiajunwang commented on a diff in pull request #2338: Implement a thread-safe listener container for zkclient

Posted by GitBox <gi...@apache.org>.
jiajunwang commented on code in PR #2338:
URL: https://github.com/apache/helix/pull/2338#discussion_r1069800692


##########
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ListenerContainer.java:
##########
@@ -0,0 +1,186 @@
+package org.apache.helix.zookeeper.zkclient;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+
+
+/**
+ * A thread-safe container wrapper class for Listeners registered to ZkClient.
+ * It stores 2 types of listener separately, one-time listener and persistent listener. The former ones are removed
+ * right after its consumption, while the latter need to be explicitly removed. 
+ * @param <T> the type of listener
+ */
+public class ListenerContainer<T> {
+  private final ReentrantLock _lock = new ReentrantLock(true);
+  private final Map<String, Set<T>> _persistentListener = new ConcurrentHashMap<>();
+  private final Map<String, Set<T>> _onetimeListener = new ConcurrentHashMap<>();
+
+  /**
+   * Add listener to the container with specified key.
+   * @param key the key to register to
+   * @param listener the listener to register
+   * @param persistListener true if the listener is persistent
+   */
+  public void addListener(String key, T listener, boolean persistListener) {
+    addListener(key, listener, persistListener ? _persistentListener : _onetimeListener);
+  }
+
+  /**
+   * Remove listener from the container.
+   * This operation removes both one-time and persistent listener from the specified key.
+   * @param key the key to remove
+   * @param listener the listener to remove
+   */
+  public void removeListener(String key, T listener) {
+    _lock.lock();
+    try {
+      removeFromListenerMap(key, listener, _persistentListener);
+      removeFromListenerMap(key, listener, _onetimeListener);
+    } finally {
+      _lock.unlock();
+    }
+  }
+
+  /**
+   * Remove listener from the container.
+   * @param key the key to remove
+   * @param listener the listener to remove
+   * @param persistListener true if remove the persistent listener, otherwise remove the one-time listener
+   */
+  public void removeListener(String key, T listener, boolean persistListener) {

Review Comment:
   What I meant is that the method should look like this,
   "public void removeListener(String key, T listener)"
   
   Because caller already specify the "persistListener" type when adding the listener. The main problem of this design is, if caller specify a different type when request to remove, the listener won't be removed and the caller won't know.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] qqu0127 commented on a diff in pull request #2338: Implement a thread-safe listener container for zkclient

Posted by GitBox <gi...@apache.org>.
qqu0127 commented on code in PR #2338:
URL: https://github.com/apache/helix/pull/2338#discussion_r1068345686


##########
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ListenerContainer.java:
##########
@@ -0,0 +1,186 @@
+package org.apache.helix.zookeeper.zkclient;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+
+
+/**
+ * A thread-safe container wrapper class for Listeners registered to ZkClient.
+ * It stores 2 types of listener separately, one-time listener and persistent listener. The former ones are removed
+ * right after its consumption, while the latter need to be explicitly removed. 
+ * @param <T> the type of listener
+ */
+public class ListenerContainer<T> {
+  private final ReentrantLock _lock = new ReentrantLock(true);
+  private final Map<String, Set<T>> _persistentListener = new ConcurrentHashMap<>();
+  private final Map<String, Set<T>> _onetimeListener = new ConcurrentHashMap<>();
+
+  /**
+   * Add listener to the container with specified key.
+   * @param key the key to register to
+   * @param listener the listener to register
+   * @param persistListener true if the listener is persistent
+   */
+  public void addListener(String key, T listener, boolean persistListener) {
+    addListener(key, listener, persistListener ? _persistentListener : _onetimeListener);
+  }
+
+  /**
+   * Remove listener from the container.
+   * This operation removes both one-time and persistent listener from the specified key.
+   * @param key the key to remove
+   * @param listener the listener to remove
+   */
+  public void removeListener(String key, T listener) {
+    _lock.lock();
+    try {
+      removeFromListenerMap(key, listener, _persistentListener);
+      removeFromListenerMap(key, listener, _onetimeListener);
+    } finally {
+      _lock.unlock();
+    }
+  }
+
+  /**
+   * Remove listener from the container.
+   * @param key the key to remove
+   * @param listener the listener to remove
+   * @param persistListener true if remove the persistent listener, otherwise remove the one-time listener
+   */
+  public void removeListener(String key, T listener, boolean persistListener) {
+    _lock.lock();
+    try {
+      if (persistListener) {
+        removeFromListenerMap(key, listener, _persistentListener);
+      } else {
+        removeFromListenerMap(key, listener, _onetimeListener);
+      }
+    } finally {
+      _lock.unlock();
+    }
+  }
+
+  /**
+   * Remove listeners from the container.
+   * @param key the key to remove
+   * @param listeners the listeners to remove
+   * @param persistListener true if remove the persistent listener, otherwise remove the one-time listener
+   */
+  public void removeListeners(String key, Collection<T> listeners, boolean persistListener) {
+    _lock.lock();
+    try {
+      if (persistListener) {
+        removeFromListenerMap(key, listeners, _persistentListener);
+      } else {
+        removeFromListenerMap(key, listeners, _onetimeListener);
+      }
+    } finally {
+      _lock.unlock();
+    }
+  }
+
+  /**
+   * Consume the listeners registered to the given key. The given consumer is applied to all one-time and persistent
+   * listeners with the given key.
+   * The one-time listeners are removed after this operation, as a result, this operation is NOT idempotent.
+   * @param key the key of the listeners
+   * @param consumer the consuming action for the listeners
+   */
+  public void consumeListeners(String key, Consumer<? super T> consumer) {
+    Set<T> onetimeListeners;
+    Set<T> persistentListeners;
+    _lock.lock();
+    try {
+      // remove one-time listeners
+      onetimeListeners = _onetimeListener.remove(key);
+    } finally {
+      _lock.unlock();
+    }
+    if (onetimeListeners == null) {
+      onetimeListeners = Collections.emptySet();
+    }
+    persistentListeners = _persistentListener.getOrDefault(key, Collections.emptySet());

Review Comment:
   For one-time listener, it's a write operation which removes the entry, we'll need lock. For persistent listener, it's pure read, we are able to go lock-free with the specified thread-safe primitives. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] rahulrane50 commented on a diff in pull request #2338: Implement a thread-safe listener container for zkclient

Posted by "rahulrane50 (via GitHub)" <gi...@apache.org>.
rahulrane50 commented on code in PR #2338:
URL: https://github.com/apache/helix/pull/2338#discussion_r1082942911


##########
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ListenerContainer.java:
##########
@@ -0,0 +1,186 @@
+package org.apache.helix.zookeeper.zkclient;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+
+
+/**
+ * A thread-safe container wrapper class for Listeners registered to ZkClient.
+ * It stores 2 types of listener separately, one-time listener and persistent listener. The former ones are removed
+ * right after its consumption, while the latter need to be explicitly removed. 
+ * @param <T> the type of listener
+ */
+public class ListenerContainer<T> {
+  private final ReentrantLock _lock = new ReentrantLock(true);
+  private final Map<String, Set<T>> _persistentListener = new ConcurrentHashMap<>();
+  private final Map<String, Set<T>> _onetimeListener = new ConcurrentHashMap<>();
+
+  /**
+   * Add listener to the container with specified key.
+   * @param key the key to register to
+   * @param listener the listener to register
+   * @param persistListener true if the listener is persistent
+   */
+  public void addListener(String key, T listener, boolean persistListener) {
+    addListener(key, listener, persistListener ? _persistentListener : _onetimeListener);
+  }
+
+  /**
+   * Remove listener from the container.
+   * This operation removes both one-time and persistent listener from the specified key.
+   * @param key the key to remove
+   * @param listener the listener to remove
+   */
+  public void removeListener(String key, T listener) {
+    _lock.lock();
+    try {
+      removeFromListenerMap(key, listener, _persistentListener);
+      removeFromListenerMap(key, listener, _onetimeListener);
+    } finally {
+      _lock.unlock();
+    }
+  }
+
+  /**
+   * Remove listener from the container.
+   * @param key the key to remove
+   * @param listener the listener to remove
+   * @param persistListener true if remove the persistent listener, otherwise remove the one-time listener
+   */
+  public void removeListener(String key, T listener, boolean persistListener) {
+    _lock.lock();
+    try {
+      if (persistListener) {
+        removeFromListenerMap(key, listener, _persistentListener);
+      } else {
+        removeFromListenerMap(key, listener, _onetimeListener);
+      }
+    } finally {
+      _lock.unlock();
+    }
+  }
+
+  /**
+   * Remove listeners from the container.
+   * @param key the key to remove
+   * @param listeners the listeners to remove
+   * @param persistListener true if remove the persistent listener, otherwise remove the one-time listener
+   */
+  public void removeListeners(String key, Collection<T> listeners, boolean persistListener) {
+    _lock.lock();
+    try {
+      if (persistListener) {
+        removeFromListenerMap(key, listeners, _persistentListener);
+      } else {
+        removeFromListenerMap(key, listeners, _onetimeListener);
+      }
+    } finally {
+      _lock.unlock();
+    }
+  }
+
+  /**
+   * Consume the listeners registered to the given key. The given consumer is applied to all one-time and persistent
+   * listeners with the given key.
+   * The one-time listeners are removed after this operation, as a result, this operation is NOT idempotent.
+   * @param key the key of the listeners
+   * @param consumer the consuming action for the listeners
+   */
+  public void consumeListeners(String key, Consumer<? super T> consumer) {
+    Set<T> onetimeListeners;
+    Set<T> persistentListeners;
+    _lock.lock();
+    try {
+      // remove one-time listeners
+      onetimeListeners = _onetimeListener.remove(key);
+    } finally {
+      _lock.unlock();
+    }
+    if (onetimeListeners == null) {
+      onetimeListeners = Collections.emptySet();
+    }
+    persistentListeners = _persistentListener.getOrDefault(key, Collections.emptySet());

Review Comment:
   Oops i missed that part!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] desaikomal commented on a diff in pull request #2338: Implement a thread-safe listener container for zkclient

Posted by GitBox <gi...@apache.org>.
desaikomal commented on code in PR #2338:
URL: https://github.com/apache/helix/pull/2338#discussion_r1068471110


##########
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ListenerContainer.java:
##########
@@ -0,0 +1,186 @@
+package org.apache.helix.zookeeper.zkclient;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+
+
+/**
+ * A thread-safe container wrapper class for Listeners registered to ZkClient.
+ * It stores 2 types of listener separately, one-time listener and persistent listener. The former ones are removed
+ * right after its consumption, while the latter need to be explicitly removed. 
+ * @param <T> the type of listener
+ */
+public class ListenerContainer<T> {
+  private final ReentrantLock _lock = new ReentrantLock(true);
+  private final Map<String, Set<T>> _persistentListener = new ConcurrentHashMap<>();
+  private final Map<String, Set<T>> _onetimeListener = new ConcurrentHashMap<>();
+
+  /**
+   * Add listener to the container with specified key.
+   * @param key the key to register to
+   * @param listener the listener to register
+   * @param persistListener true if the listener is persistent
+   */
+  public void addListener(String key, T listener, boolean persistListener) {
+    addListener(key, listener, persistListener ? _persistentListener : _onetimeListener);
+  }
+
+  /**
+   * Remove listener from the container.
+   * This operation removes both one-time and persistent listener from the specified key.
+   * @param key the key to remove
+   * @param listener the listener to remove
+   */
+  public void removeListener(String key, T listener) {
+    _lock.lock();
+    try {
+      removeFromListenerMap(key, listener, _persistentListener);
+      removeFromListenerMap(key, listener, _onetimeListener);
+    } finally {
+      _lock.unlock();
+    }
+  }
+
+  /**
+   * Remove listener from the container.
+   * @param key the key to remove
+   * @param listener the listener to remove
+   * @param persistListener true if remove the persistent listener, otherwise remove the one-time listener
+   */
+  public void removeListener(String key, T listener, boolean persistListener) {
+    _lock.lock();
+    try {
+      if (persistListener) {
+        removeFromListenerMap(key, listener, _persistentListener);
+      } else {
+        removeFromListenerMap(key, listener, _onetimeListener);
+      }
+    } finally {
+      _lock.unlock();
+    }
+  }
+
+  /**
+   * Remove listeners from the container.
+   * @param key the key to remove
+   * @param listeners the listeners to remove
+   * @param persistListener true if remove the persistent listener, otherwise remove the one-time listener
+   */
+  public void removeListeners(String key, Collection<T> listeners, boolean persistListener) {
+    _lock.lock();
+    try {
+      if (persistListener) {
+        removeFromListenerMap(key, listeners, _persistentListener);
+      } else {
+        removeFromListenerMap(key, listeners, _onetimeListener);
+      }
+    } finally {
+      _lock.unlock();
+    }
+  }
+
+  /**
+   * Consume the listeners registered to the given key. The given consumer is applied to all one-time and persistent
+   * listeners with the given key.
+   * The one-time listeners are removed after this operation, as a result, this operation is NOT idempotent.
+   * @param key the key of the listeners
+   * @param consumer the consuming action for the listeners
+   */
+  public void consumeListeners(String key, Consumer<? super T> consumer) {
+    Set<T> onetimeListeners;
+    Set<T> persistentListeners;
+    _lock.lock();
+    try {
+      // remove one-time listeners
+      onetimeListeners = _onetimeListener.remove(key);

Review Comment:
   for deterministic test behavior, i think, lock will help. but we code for correctness of the system.
   
   If you are going to use lock explicitly, then you can use regular HashMap. We are paying the cost twice as ConcurrentHashMap implementation has lock per segment:
   https://github.com/frohoff/jdk8u-jdk/blob/master/src/share/classes/java/util/concurrent/ConcurrentHashMap.java#L1370
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] qqu0127 commented on pull request #2338: Implement a thread-safe listener container for zkclient

Posted by "qqu0127 (via GitHub)" <gi...@apache.org>.
qqu0127 commented on PR #2338:
URL: https://github.com/apache/helix/pull/2338#issuecomment-1401136033

   @mgao0 Thanks for the comments. `using T for listener types might be too wildcard` this is a great point. However I don't see an applicable way to abstract a unified interface out of all possible listener consumption. At best we could apply marker interfaces, but that involves global changes to existing ones, so I didn't follow that.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] jiajunwang commented on a diff in pull request #2338: Implement a thread-safe listener container for zkclient

Posted by GitBox <gi...@apache.org>.
jiajunwang commented on code in PR #2338:
URL: https://github.com/apache/helix/pull/2338#discussion_r1069800692


##########
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ListenerContainer.java:
##########
@@ -0,0 +1,186 @@
+package org.apache.helix.zookeeper.zkclient;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+
+
+/**
+ * A thread-safe container wrapper class for Listeners registered to ZkClient.
+ * It stores 2 types of listener separately, one-time listener and persistent listener. The former ones are removed
+ * right after its consumption, while the latter need to be explicitly removed. 
+ * @param <T> the type of listener
+ */
+public class ListenerContainer<T> {
+  private final ReentrantLock _lock = new ReentrantLock(true);
+  private final Map<String, Set<T>> _persistentListener = new ConcurrentHashMap<>();
+  private final Map<String, Set<T>> _onetimeListener = new ConcurrentHashMap<>();
+
+  /**
+   * Add listener to the container with specified key.
+   * @param key the key to register to
+   * @param listener the listener to register
+   * @param persistListener true if the listener is persistent
+   */
+  public void addListener(String key, T listener, boolean persistListener) {
+    addListener(key, listener, persistListener ? _persistentListener : _onetimeListener);
+  }
+
+  /**
+   * Remove listener from the container.
+   * This operation removes both one-time and persistent listener from the specified key.
+   * @param key the key to remove
+   * @param listener the listener to remove
+   */
+  public void removeListener(String key, T listener) {
+    _lock.lock();
+    try {
+      removeFromListenerMap(key, listener, _persistentListener);
+      removeFromListenerMap(key, listener, _onetimeListener);
+    } finally {
+      _lock.unlock();
+    }
+  }
+
+  /**
+   * Remove listener from the container.
+   * @param key the key to remove
+   * @param listener the listener to remove
+   * @param persistListener true if remove the persistent listener, otherwise remove the one-time listener
+   */
+  public void removeListener(String key, T listener, boolean persistListener) {

Review Comment:
   What I meant is that the method should look like this,
   "public void removeListener(String key, T listener)"
   
   Because caller already specify the "persistListener" type when adding the listener. The main problem of this design is, if caller specify a different type when request to remove, the listener won't be removed and the caller won't know (acutally no one knows until the server is OOM).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] desaikomal commented on a diff in pull request #2338: Implement a thread-safe listener container for zkclient

Posted by GitBox <gi...@apache.org>.
desaikomal commented on code in PR #2338:
URL: https://github.com/apache/helix/pull/2338#discussion_r1068365894


##########
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ListenerContainer.java:
##########
@@ -0,0 +1,186 @@
+package org.apache.helix.zookeeper.zkclient;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+
+
+/**
+ * A thread-safe container wrapper class for Listeners registered to ZkClient.
+ * It stores 2 types of listener separately, one-time listener and persistent listener. The former ones are removed

Review Comment:
   Do we have any users of one-time listener?
   AFAIK there are none.  can you please confirm



##########
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ListenerContainer.java:
##########
@@ -0,0 +1,186 @@
+package org.apache.helix.zookeeper.zkclient;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+
+
+/**
+ * A thread-safe container wrapper class for Listeners registered to ZkClient.
+ * It stores 2 types of listener separately, one-time listener and persistent listener. The former ones are removed
+ * right after its consumption, while the latter need to be explicitly removed. 
+ * @param <T> the type of listener
+ */
+public class ListenerContainer<T> {
+  private final ReentrantLock _lock = new ReentrantLock(true);
+  private final Map<String, Set<T>> _persistentListener = new ConcurrentHashMap<>();

Review Comment:
   ConcurrentHashMap has methods for concurrency and so i am not sure you require explicit lock.



##########
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ListenerContainer.java:
##########
@@ -0,0 +1,186 @@
+package org.apache.helix.zookeeper.zkclient;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+
+
+/**
+ * A thread-safe container wrapper class for Listeners registered to ZkClient.
+ * It stores 2 types of listener separately, one-time listener and persistent listener. The former ones are removed
+ * right after its consumption, while the latter need to be explicitly removed. 
+ * @param <T> the type of listener
+ */
+public class ListenerContainer<T> {
+  private final ReentrantLock _lock = new ReentrantLock(true);
+  private final Map<String, Set<T>> _persistentListener = new ConcurrentHashMap<>();
+  private final Map<String, Set<T>> _onetimeListener = new ConcurrentHashMap<>();
+
+  /**
+   * Add listener to the container with specified key.
+   * @param key the key to register to
+   * @param listener the listener to register
+   * @param persistListener true if the listener is persistent
+   */
+  public void addListener(String key, T listener, boolean persistListener) {
+    addListener(key, listener, persistListener ? _persistentListener : _onetimeListener);
+  }
+
+  /**
+   * Remove listener from the container.
+   * This operation removes both one-time and persistent listener from the specified key.
+   * @param key the key to remove
+   * @param listener the listener to remove
+   */
+  public void removeListener(String key, T listener) {

Review Comment:
   If we remove one-time listener, lot of the code will become simpler.
   Can you first please let me know who needs one-time listener?
   
   thanks,



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] qqu0127 commented on a diff in pull request #2338: Implement a thread-safe listener container for zkclient

Posted by GitBox <gi...@apache.org>.
qqu0127 commented on code in PR #2338:
URL: https://github.com/apache/helix/pull/2338#discussion_r1068506241


##########
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ListenerContainer.java:
##########
@@ -0,0 +1,186 @@
+package org.apache.helix.zookeeper.zkclient;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+
+
+/**
+ * A thread-safe container wrapper class for Listeners registered to ZkClient.
+ * It stores 2 types of listener separately, one-time listener and persistent listener. The former ones are removed
+ * right after its consumption, while the latter need to be explicitly removed. 
+ * @param <T> the type of listener
+ */
+public class ListenerContainer<T> {
+  private final ReentrantLock _lock = new ReentrantLock(true);
+  private final Map<String, Set<T>> _persistentListener = new ConcurrentHashMap<>();
+  private final Map<String, Set<T>> _onetimeListener = new ConcurrentHashMap<>();
+
+  /**
+   * Add listener to the container with specified key.
+   * @param key the key to register to
+   * @param listener the listener to register
+   * @param persistListener true if the listener is persistent
+   */
+  public void addListener(String key, T listener, boolean persistListener) {
+    addListener(key, listener, persistListener ? _persistentListener : _onetimeListener);
+  }
+
+  /**
+   * Remove listener from the container.
+   * This operation removes both one-time and persistent listener from the specified key.
+   * @param key the key to remove
+   * @param listener the listener to remove
+   */
+  public void removeListener(String key, T listener) {
+    _lock.lock();
+    try {
+      removeFromListenerMap(key, listener, _persistentListener);
+      removeFromListenerMap(key, listener, _onetimeListener);
+    } finally {
+      _lock.unlock();
+    }
+  }
+
+  /**
+   * Remove listener from the container.
+   * @param key the key to remove
+   * @param listener the listener to remove
+   * @param persistListener true if remove the persistent listener, otherwise remove the one-time listener
+   */
+  public void removeListener(String key, T listener, boolean persistListener) {
+    _lock.lock();
+    try {
+      if (persistListener) {
+        removeFromListenerMap(key, listener, _persistentListener);
+      } else {
+        removeFromListenerMap(key, listener, _onetimeListener);
+      }
+    } finally {
+      _lock.unlock();
+    }
+  }
+
+  /**
+   * Remove listeners from the container.
+   * @param key the key to remove
+   * @param listeners the listeners to remove
+   * @param persistListener true if remove the persistent listener, otherwise remove the one-time listener
+   */
+  public void removeListeners(String key, Collection<T> listeners, boolean persistListener) {
+    _lock.lock();
+    try {
+      if (persistListener) {
+        removeFromListenerMap(key, listeners, _persistentListener);
+      } else {
+        removeFromListenerMap(key, listeners, _onetimeListener);
+      }
+    } finally {
+      _lock.unlock();
+    }
+  }
+
+  /**
+   * Consume the listeners registered to the given key. The given consumer is applied to all one-time and persistent
+   * listeners with the given key.
+   * The one-time listeners are removed after this operation, as a result, this operation is NOT idempotent.
+   * @param key the key of the listeners
+   * @param consumer the consuming action for the listeners
+   */
+  public void consumeListeners(String key, Consumer<? super T> consumer) {
+    Set<T> onetimeListeners;
+    Set<T> persistentListeners;
+    _lock.lock();
+    try {
+      // remove one-time listeners
+      onetimeListeners = _onetimeListener.remove(key);

Review Comment:
   Thanks @desaikomal for the pointer. Short answer on this, we need the lock for correctness, and it's the current impl.
   The use of CopyOnWriteArraySet as entry in ConcurrentHashMap requires a lock to make the overall structure thread-safe. Otherwise there is race condition between consume and add, and listener might be lost. it's also the same for current implementation, please see https://github.com/apache/helix/blob/master/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java#L290
   
   On the other hand, the use of HashMap instead of ConcurrentHashMap could work, but it requires we use a read lock on iteration read. In our use case, reads vastly out number writes, so a lock-free read is preferable. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] qqu0127 commented on a diff in pull request #2338: Implement a thread-safe listener container for zkclient

Posted by GitBox <gi...@apache.org>.
qqu0127 commented on code in PR #2338:
URL: https://github.com/apache/helix/pull/2338#discussion_r1068348501


##########
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ListenerContainer.java:
##########
@@ -0,0 +1,186 @@
+package org.apache.helix.zookeeper.zkclient;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+
+
+/**
+ * A thread-safe container wrapper class for Listeners registered to ZkClient.
+ * It stores 2 types of listener separately, one-time listener and persistent listener. The former ones are removed
+ * right after its consumption, while the latter need to be explicitly removed. 
+ * @param <T> the type of listener
+ */
+public class ListenerContainer<T> {
+  private final ReentrantLock _lock = new ReentrantLock(true);

Review Comment:
   The lock is used in all write APIs, add, remove and consume. We can use synchronized keyword too



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] qqu0127 commented on a diff in pull request #2338: Implement a thread-safe listener container for zkclient

Posted by GitBox <gi...@apache.org>.
qqu0127 commented on code in PR #2338:
URL: https://github.com/apache/helix/pull/2338#discussion_r1068378381


##########
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ListenerContainer.java:
##########
@@ -0,0 +1,186 @@
+package org.apache.helix.zookeeper.zkclient;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+
+
+/**
+ * A thread-safe container wrapper class for Listeners registered to ZkClient.
+ * It stores 2 types of listener separately, one-time listener and persistent listener. The former ones are removed
+ * right after its consumption, while the latter need to be explicitly removed. 
+ * @param <T> the type of listener
+ */
+public class ListenerContainer<T> {
+  private final ReentrantLock _lock = new ReentrantLock(true);
+  private final Map<String, Set<T>> _persistentListener = new ConcurrentHashMap<>();
+  private final Map<String, Set<T>> _onetimeListener = new ConcurrentHashMap<>();
+
+  /**
+   * Add listener to the container with specified key.
+   * @param key the key to register to
+   * @param listener the listener to register
+   * @param persistListener true if the listener is persistent
+   */
+  public void addListener(String key, T listener, boolean persistListener) {
+    addListener(key, listener, persistListener ? _persistentListener : _onetimeListener);
+  }
+
+  /**
+   * Remove listener from the container.
+   * This operation removes both one-time and persistent listener from the specified key.
+   * @param key the key to remove
+   * @param listener the listener to remove
+   */
+  public void removeListener(String key, T listener) {
+    _lock.lock();
+    try {
+      removeFromListenerMap(key, listener, _persistentListener);
+      removeFromListenerMap(key, listener, _onetimeListener);
+    } finally {
+      _lock.unlock();
+    }
+  }
+
+  /**
+   * Remove listener from the container.
+   * @param key the key to remove
+   * @param listener the listener to remove
+   * @param persistListener true if remove the persistent listener, otherwise remove the one-time listener
+   */
+  public void removeListener(String key, T listener, boolean persistListener) {
+    _lock.lock();
+    try {
+      if (persistListener) {
+        removeFromListenerMap(key, listener, _persistentListener);
+      } else {
+        removeFromListenerMap(key, listener, _onetimeListener);
+      }
+    } finally {
+      _lock.unlock();
+    }
+  }
+
+  /**
+   * Remove listeners from the container.
+   * @param key the key to remove
+   * @param listeners the listeners to remove
+   * @param persistListener true if remove the persistent listener, otherwise remove the one-time listener
+   */
+  public void removeListeners(String key, Collection<T> listeners, boolean persistListener) {
+    _lock.lock();
+    try {
+      if (persistListener) {
+        removeFromListenerMap(key, listeners, _persistentListener);
+      } else {
+        removeFromListenerMap(key, listeners, _onetimeListener);
+      }
+    } finally {
+      _lock.unlock();
+    }
+  }
+
+  /**
+   * Consume the listeners registered to the given key. The given consumer is applied to all one-time and persistent
+   * listeners with the given key.
+   * The one-time listeners are removed after this operation, as a result, this operation is NOT idempotent.
+   * @param key the key of the listeners
+   * @param consumer the consuming action for the listeners
+   */
+  public void consumeListeners(String key, Consumer<? super T> consumer) {
+    Set<T> onetimeListeners;
+    Set<T> persistentListeners;
+    _lock.lock();
+    try {
+      // remove one-time listeners
+      onetimeListeners = _onetimeListener.remove(key);

Review Comment:
   Good question, I think there is a subtle race condition between `consume` and `add`. When thread A adds a one-time listener T, while thread B consume it as in line 122, if the timing is unfortunate, `onetimeListener` may be getting the snapshot before A completes, which doesn't contain T. However, because the entry together with the COWS instance is removed, the new listener T is lost.
   (The unit test fails if we remove the lock)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] qqu0127 commented on a diff in pull request #2338: Implement a thread-safe listener container for zkclient

Posted by GitBox <gi...@apache.org>.
qqu0127 commented on code in PR #2338:
URL: https://github.com/apache/helix/pull/2338#discussion_r1068397632


##########
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ListenerContainer.java:
##########
@@ -0,0 +1,186 @@
+package org.apache.helix.zookeeper.zkclient;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+
+
+/**
+ * A thread-safe container wrapper class for Listeners registered to ZkClient.
+ * It stores 2 types of listener separately, one-time listener and persistent listener. The former ones are removed
+ * right after its consumption, while the latter need to be explicitly removed. 
+ * @param <T> the type of listener
+ */
+public class ListenerContainer<T> {
+  private final ReentrantLock _lock = new ReentrantLock(true);
+  private final Map<String, Set<T>> _persistentListener = new ConcurrentHashMap<>();

Review Comment:
   ConcurrentHashMap has instance level thread-safety, but it doesn't guarantee thread-safety in its value set. The lock is required, you can see https://github.com/apache/helix/pull/2338#discussion_r1068357047



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] Marcosrico commented on a diff in pull request #2338: Implement a thread-safe listener container for zkclient

Posted by GitBox <gi...@apache.org>.
Marcosrico commented on code in PR #2338:
URL: https://github.com/apache/helix/pull/2338#discussion_r1068357047


##########
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ListenerContainer.java:
##########
@@ -0,0 +1,186 @@
+package org.apache.helix.zookeeper.zkclient;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+
+
+/**
+ * A thread-safe container wrapper class for Listeners registered to ZkClient.
+ * It stores 2 types of listener separately, one-time listener and persistent listener. The former ones are removed
+ * right after its consumption, while the latter need to be explicitly removed. 
+ * @param <T> the type of listener
+ */
+public class ListenerContainer<T> {
+  private final ReentrantLock _lock = new ReentrantLock(true);
+  private final Map<String, Set<T>> _persistentListener = new ConcurrentHashMap<>();
+  private final Map<String, Set<T>> _onetimeListener = new ConcurrentHashMap<>();
+
+  /**
+   * Add listener to the container with specified key.
+   * @param key the key to register to
+   * @param listener the listener to register
+   * @param persistListener true if the listener is persistent
+   */
+  public void addListener(String key, T listener, boolean persistListener) {
+    addListener(key, listener, persistListener ? _persistentListener : _onetimeListener);
+  }
+
+  /**
+   * Remove listener from the container.
+   * This operation removes both one-time and persistent listener from the specified key.
+   * @param key the key to remove
+   * @param listener the listener to remove
+   */
+  public void removeListener(String key, T listener) {
+    _lock.lock();
+    try {
+      removeFromListenerMap(key, listener, _persistentListener);
+      removeFromListenerMap(key, listener, _onetimeListener);
+    } finally {
+      _lock.unlock();
+    }
+  }
+
+  /**
+   * Remove listener from the container.
+   * @param key the key to remove
+   * @param listener the listener to remove
+   * @param persistListener true if remove the persistent listener, otherwise remove the one-time listener
+   */
+  public void removeListener(String key, T listener, boolean persistListener) {
+    _lock.lock();
+    try {
+      if (persistListener) {
+        removeFromListenerMap(key, listener, _persistentListener);
+      } else {
+        removeFromListenerMap(key, listener, _onetimeListener);
+      }
+    } finally {
+      _lock.unlock();
+    }
+  }
+
+  /**
+   * Remove listeners from the container.
+   * @param key the key to remove
+   * @param listeners the listeners to remove
+   * @param persistListener true if remove the persistent listener, otherwise remove the one-time listener
+   */
+  public void removeListeners(String key, Collection<T> listeners, boolean persistListener) {
+    _lock.lock();
+    try {
+      if (persistListener) {
+        removeFromListenerMap(key, listeners, _persistentListener);
+      } else {
+        removeFromListenerMap(key, listeners, _onetimeListener);
+      }
+    } finally {
+      _lock.unlock();
+    }
+  }
+
+  /**
+   * Consume the listeners registered to the given key. The given consumer is applied to all one-time and persistent
+   * listeners with the given key.
+   * The one-time listeners are removed after this operation, as a result, this operation is NOT idempotent.
+   * @param key the key of the listeners
+   * @param consumer the consuming action for the listeners
+   */
+  public void consumeListeners(String key, Consumer<? super T> consumer) {
+    Set<T> onetimeListeners;
+    Set<T> persistentListeners;
+    _lock.lock();
+    try {
+      // remove one-time listeners
+      onetimeListeners = _onetimeListener.remove(key);

Review Comment:
   Please correct me if I'm wrong but I believe the remove method in ConcurrentHashMap is atomic in itself so not sure if it is necessary to lock beforehand.
   
   Also should we check if key is null? Given underlying data structure of a ConcurrentHashMap is a HashTable, attempting to remove a null key would throw a NullPointerException which might not be the desired effect. Just a thought



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] rahulrane50 commented on a diff in pull request #2338: Implement a thread-safe listener container for zkclient

Posted by GitBox <gi...@apache.org>.
rahulrane50 commented on code in PR #2338:
URL: https://github.com/apache/helix/pull/2338#discussion_r1067611766


##########
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ListenerContainer.java:
##########
@@ -0,0 +1,186 @@
+package org.apache.helix.zookeeper.zkclient;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+
+
+/**
+ * A thread-safe container wrapper class for Listeners registered to ZkClient.
+ * It stores 2 types of listener separately, one-time listener and persistent listener. The former ones are removed
+ * right after its consumption, while the latter need to be explicitly removed. 
+ * @param <T> the type of listener
+ */
+public class ListenerContainer<T> {
+  private final ReentrantLock _lock = new ReentrantLock(true);

Review Comment:
   I see that this lock is used only once in below APIs. just commented there and also would like to know if this is really needed?



##########
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ListenerContainer.java:
##########
@@ -0,0 +1,186 @@
+package org.apache.helix.zookeeper.zkclient;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+
+
+/**
+ * A thread-safe container wrapper class for Listeners registered to ZkClient.
+ * It stores 2 types of listener separately, one-time listener and persistent listener. The former ones are removed
+ * right after its consumption, while the latter need to be explicitly removed. 
+ * @param <T> the type of listener
+ */
+public class ListenerContainer<T> {
+  private final ReentrantLock _lock = new ReentrantLock(true);
+  private final Map<String, Set<T>> _persistentListener = new ConcurrentHashMap<>();
+  private final Map<String, Set<T>> _onetimeListener = new ConcurrentHashMap<>();
+
+  /**
+   * Add listener to the container with specified key.
+   * @param key the key to register to
+   * @param listener the listener to register
+   * @param persistListener true if the listener is persistent
+   */
+  public void addListener(String key, T listener, boolean persistListener) {
+    addListener(key, listener, persistListener ? _persistentListener : _onetimeListener);
+  }
+
+  /**
+   * Remove listener from the container.
+   * This operation removes both one-time and persistent listener from the specified key.
+   * @param key the key to remove
+   * @param listener the listener to remove
+   */
+  public void removeListener(String key, T listener) {
+    _lock.lock();
+    try {
+      removeFromListenerMap(key, listener, _persistentListener);
+      removeFromListenerMap(key, listener, _onetimeListener);
+    } finally {
+      _lock.unlock();
+    }
+  }
+
+  /**
+   * Remove listener from the container.
+   * @param key the key to remove
+   * @param listener the listener to remove
+   * @param persistListener true if remove the persistent listener, otherwise remove the one-time listener
+   */
+  public void removeListener(String key, T listener, boolean persistListener) {
+    _lock.lock();
+    try {
+      if (persistListener) {
+        removeFromListenerMap(key, listener, _persistentListener);
+      } else {
+        removeFromListenerMap(key, listener, _onetimeListener);
+      }
+    } finally {
+      _lock.unlock();
+    }
+  }
+
+  /**
+   * Remove listeners from the container.
+   * @param key the key to remove
+   * @param listeners the listeners to remove
+   * @param persistListener true if remove the persistent listener, otherwise remove the one-time listener
+   */
+  public void removeListeners(String key, Collection<T> listeners, boolean persistListener) {
+    _lock.lock();
+    try {
+      if (persistListener) {
+        removeFromListenerMap(key, listeners, _persistentListener);
+      } else {
+        removeFromListenerMap(key, listeners, _onetimeListener);
+      }
+    } finally {
+      _lock.unlock();
+    }
+  }
+
+  /**
+   * Consume the listeners registered to the given key. The given consumer is applied to all one-time and persistent

Review Comment:
   Umm this are great APIs but just few questions : 
   1. Does it make sense or would there be scenarios where both persistent as well as one time listeners will be added for same key?
   2. If above case can be true then it would be interesting to know the behaviour of having multiple listeners for same key. My guess is that should be okay but just calling this out.



##########
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ListenerContainer.java:
##########
@@ -0,0 +1,186 @@
+package org.apache.helix.zookeeper.zkclient;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+
+
+/**
+ * A thread-safe container wrapper class for Listeners registered to ZkClient.
+ * It stores 2 types of listener separately, one-time listener and persistent listener. The former ones are removed
+ * right after its consumption, while the latter need to be explicitly removed. 
+ * @param <T> the type of listener
+ */
+public class ListenerContainer<T> {
+  private final ReentrantLock _lock = new ReentrantLock(true);
+  private final Map<String, Set<T>> _persistentListener = new ConcurrentHashMap<>();
+  private final Map<String, Set<T>> _onetimeListener = new ConcurrentHashMap<>();
+
+  /**
+   * Add listener to the container with specified key.
+   * @param key the key to register to
+   * @param listener the listener to register
+   * @param persistListener true if the listener is persistent
+   */
+  public void addListener(String key, T listener, boolean persistListener) {
+    addListener(key, listener, persistListener ? _persistentListener : _onetimeListener);
+  }
+
+  /**
+   * Remove listener from the container.
+   * This operation removes both one-time and persistent listener from the specified key.
+   * @param key the key to remove
+   * @param listener the listener to remove
+   */
+  public void removeListener(String key, T listener) {
+    _lock.lock();
+    try {
+      removeFromListenerMap(key, listener, _persistentListener);
+      removeFromListenerMap(key, listener, _onetimeListener);
+    } finally {
+      _lock.unlock();
+    }
+  }
+
+  /**
+   * Remove listener from the container.
+   * @param key the key to remove
+   * @param listener the listener to remove
+   * @param persistListener true if remove the persistent listener, otherwise remove the one-time listener
+   */
+  public void removeListener(String key, T listener, boolean persistListener) {
+    _lock.lock();
+    try {
+      if (persistListener) {
+        removeFromListenerMap(key, listener, _persistentListener);
+      } else {
+        removeFromListenerMap(key, listener, _onetimeListener);
+      }
+    } finally {
+      _lock.unlock();
+    }
+  }
+
+  /**
+   * Remove listeners from the container.
+   * @param key the key to remove
+   * @param listeners the listeners to remove
+   * @param persistListener true if remove the persistent listener, otherwise remove the one-time listener
+   */
+  public void removeListeners(String key, Collection<T> listeners, boolean persistListener) {
+    _lock.lock();
+    try {
+      if (persistListener) {
+        removeFromListenerMap(key, listeners, _persistentListener);
+      } else {
+        removeFromListenerMap(key, listeners, _onetimeListener);
+      }
+    } finally {
+      _lock.unlock();
+    }
+  }
+
+  /**
+   * Consume the listeners registered to the given key. The given consumer is applied to all one-time and persistent
+   * listeners with the given key.
+   * The one-time listeners are removed after this operation, as a result, this operation is NOT idempotent.
+   * @param key the key of the listeners
+   * @param consumer the consuming action for the listeners
+   */
+  public void consumeListeners(String key, Consumer<? super T> consumer) {
+    Set<T> onetimeListeners;
+    Set<T> persistentListeners;
+    _lock.lock();
+    try {
+      // remove one-time listeners
+      onetimeListeners = _onetimeListener.remove(key);
+    } finally {
+      _lock.unlock();
+    }
+    if (onetimeListeners == null) {
+      onetimeListeners = Collections.emptySet();
+    }
+    persistentListeners = _persistentListener.getOrDefault(key, Collections.emptySet());

Review Comment:
   just curious, why do we need lock for onetimelisteners and not here? Also we can just use getOrDefault for onetimelisteners as well right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] qqu0127 commented on a diff in pull request #2338: Implement a thread-safe listener container for zkclient

Posted by GitBox <gi...@apache.org>.
qqu0127 commented on code in PR #2338:
URL: https://github.com/apache/helix/pull/2338#discussion_r1068378381


##########
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ListenerContainer.java:
##########
@@ -0,0 +1,186 @@
+package org.apache.helix.zookeeper.zkclient;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+
+
+/**
+ * A thread-safe container wrapper class for Listeners registered to ZkClient.
+ * It stores 2 types of listener separately, one-time listener and persistent listener. The former ones are removed
+ * right after its consumption, while the latter need to be explicitly removed. 
+ * @param <T> the type of listener
+ */
+public class ListenerContainer<T> {
+  private final ReentrantLock _lock = new ReentrantLock(true);
+  private final Map<String, Set<T>> _persistentListener = new ConcurrentHashMap<>();
+  private final Map<String, Set<T>> _onetimeListener = new ConcurrentHashMap<>();
+
+  /**
+   * Add listener to the container with specified key.
+   * @param key the key to register to
+   * @param listener the listener to register
+   * @param persistListener true if the listener is persistent
+   */
+  public void addListener(String key, T listener, boolean persistListener) {
+    addListener(key, listener, persistListener ? _persistentListener : _onetimeListener);
+  }
+
+  /**
+   * Remove listener from the container.
+   * This operation removes both one-time and persistent listener from the specified key.
+   * @param key the key to remove
+   * @param listener the listener to remove
+   */
+  public void removeListener(String key, T listener) {
+    _lock.lock();
+    try {
+      removeFromListenerMap(key, listener, _persistentListener);
+      removeFromListenerMap(key, listener, _onetimeListener);
+    } finally {
+      _lock.unlock();
+    }
+  }
+
+  /**
+   * Remove listener from the container.
+   * @param key the key to remove
+   * @param listener the listener to remove
+   * @param persistListener true if remove the persistent listener, otherwise remove the one-time listener
+   */
+  public void removeListener(String key, T listener, boolean persistListener) {
+    _lock.lock();
+    try {
+      if (persistListener) {
+        removeFromListenerMap(key, listener, _persistentListener);
+      } else {
+        removeFromListenerMap(key, listener, _onetimeListener);
+      }
+    } finally {
+      _lock.unlock();
+    }
+  }
+
+  /**
+   * Remove listeners from the container.
+   * @param key the key to remove
+   * @param listeners the listeners to remove
+   * @param persistListener true if remove the persistent listener, otherwise remove the one-time listener
+   */
+  public void removeListeners(String key, Collection<T> listeners, boolean persistListener) {
+    _lock.lock();
+    try {
+      if (persistListener) {
+        removeFromListenerMap(key, listeners, _persistentListener);
+      } else {
+        removeFromListenerMap(key, listeners, _onetimeListener);
+      }
+    } finally {
+      _lock.unlock();
+    }
+  }
+
+  /**
+   * Consume the listeners registered to the given key. The given consumer is applied to all one-time and persistent
+   * listeners with the given key.
+   * The one-time listeners are removed after this operation, as a result, this operation is NOT idempotent.
+   * @param key the key of the listeners
+   * @param consumer the consuming action for the listeners
+   */
+  public void consumeListeners(String key, Consumer<? super T> consumer) {
+    Set<T> onetimeListeners;
+    Set<T> persistentListeners;
+    _lock.lock();
+    try {
+      // remove one-time listeners
+      onetimeListeners = _onetimeListener.remove(key);

Review Comment:
   Good question, I think there is a subtle race condition between `consume` and `add`. When thread A adds a one-time listener T, while thread B consume it as in line 122, if the timing is unfortunate, `onetimeListener` may be getting the snapshot before A completes, which doesn't contain T. However, because the entry together with the COWAS instance is removed, the new listener T is lost.
   (The unit test fails if we remove the lock)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] qqu0127 commented on a diff in pull request #2338: Implement a thread-safe listener container for zkclient

Posted by "qqu0127 (via GitHub)" <gi...@apache.org>.
qqu0127 commented on code in PR #2338:
URL: https://github.com/apache/helix/pull/2338#discussion_r1082973260


##########
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ListenerContainer.java:
##########
@@ -0,0 +1,186 @@
+package org.apache.helix.zookeeper.zkclient;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+
+
+/**
+ * A thread-safe container wrapper class for Listeners registered to ZkClient.
+ * It stores 2 types of listener separately, one-time listener and persistent listener. The former ones are removed
+ * right after its consumption, while the latter need to be explicitly removed. 
+ * @param <T> the type of listener
+ */
+public class ListenerContainer<T> {
+  private final ReentrantLock _lock = new ReentrantLock(true);
+  private final Map<String, Set<T>> _persistentListener = new ConcurrentHashMap<>();
+  private final Map<String, Set<T>> _onetimeListener = new ConcurrentHashMap<>();
+
+  /**
+   * Add listener to the container with specified key.
+   * @param key the key to register to
+   * @param listener the listener to register
+   * @param persistListener true if the listener is persistent
+   */
+  public void addListener(String key, T listener, boolean persistListener) {
+    addListener(key, listener, persistListener ? _persistentListener : _onetimeListener);
+  }
+
+  /**
+   * Remove listener from the container.
+   * This operation removes both one-time and persistent listener from the specified key.
+   * @param key the key to remove
+   * @param listener the listener to remove
+   */
+  public void removeListener(String key, T listener) {
+    _lock.lock();
+    try {
+      removeFromListenerMap(key, listener, _persistentListener);
+      removeFromListenerMap(key, listener, _onetimeListener);
+    } finally {
+      _lock.unlock();
+    }
+  }
+
+  /**
+   * Remove listener from the container.
+   * @param key the key to remove
+   * @param listener the listener to remove
+   * @param persistListener true if remove the persistent listener, otherwise remove the one-time listener
+   */
+  public void removeListener(String key, T listener, boolean persistListener) {
+    _lock.lock();
+    try {
+      if (persistListener) {
+        removeFromListenerMap(key, listener, _persistentListener);
+      } else {
+        removeFromListenerMap(key, listener, _onetimeListener);
+      }
+    } finally {
+      _lock.unlock();
+    }
+  }
+
+  /**
+   * Remove listeners from the container.
+   * @param key the key to remove
+   * @param listeners the listeners to remove
+   * @param persistListener true if remove the persistent listener, otherwise remove the one-time listener
+   */
+  public void removeListeners(String key, Collection<T> listeners, boolean persistListener) {
+    _lock.lock();
+    try {
+      if (persistListener) {
+        removeFromListenerMap(key, listeners, _persistentListener);
+      } else {
+        removeFromListenerMap(key, listeners, _onetimeListener);
+      }
+    } finally {
+      _lock.unlock();
+    }
+  }
+
+  /**
+   * Consume the listeners registered to the given key. The given consumer is applied to all one-time and persistent

Review Comment:
   For 1, yes, it's possible, we don't block that
   2, for the same key, there is a set union operation, so it's only triggered once.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] mgao0 commented on a diff in pull request #2338: Implement a thread-safe listener container for zkclient

Posted by "mgao0 (via GitHub)" <gi...@apache.org>.
mgao0 commented on code in PR #2338:
URL: https://github.com/apache/helix/pull/2338#discussion_r1084648865


##########
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ListenerContainer.java:
##########
@@ -0,0 +1,161 @@
+package org.apache.helix.zookeeper.zkclient;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.function.Consumer;
+
+
+/**
+ * A thread-safe container wrapper class for Listeners registered to ZkClient.
+ * It stores 2 types of listener separately, one-time listener and persistent listener. The former ones are removed
+ * right after its consumption, while the latter need to be explicitly removed.
+ * @param <T> the type of listener
+ */
+public class ListenerContainer<T> {
+  private final Map<String, Set<T>> _persistentListener = new ConcurrentHashMap<>();
+  private final Map<String, Set<T>> _onetimeListener = new ConcurrentHashMap<>();
+
+  /**
+   * Add listener to the container with specified key.
+   * @param key the key to register to
+   * @param listener the listener to register
+   * @param persistListener true if the listener is persistent
+   */
+  public void addListener(String key, T listener, boolean persistListener) {
+    addListener(key, listener, persistListener ? _persistentListener : _onetimeListener);
+  }
+
+  /**
+   * Remove listener from the container.
+   * This operation removes both one-time and persistent listener from the specified key.
+   * @param key the key to remove
+   * @param listener the listener to remove
+   */
+  public synchronized void removeListener(String key, T listener) {
+    removeFromListenerMap(key, listener, _persistentListener);
+    removeFromListenerMap(key, listener, _onetimeListener);
+  }
+
+  /**
+   * Remove listener from the container.
+   * @param key the key to remove
+   * @param listener the listener to remove
+   * @param persistListener true if remove the persistent listener, otherwise remove the one-time listener
+   */
+  public synchronized void removeListener(String key, T listener, boolean persistListener) {
+    if (persistListener) {
+      removeFromListenerMap(key, listener, _persistentListener);
+    } else {
+      removeFromListenerMap(key, listener, _onetimeListener);
+    }
+  }
+
+  /**
+   * Remove listeners from the container.
+   * @param key the key to remove
+   * @param listeners the listeners to remove
+   * @param persistListener true if remove the persistent listener, otherwise remove the one-time listener
+   */
+  public synchronized void removeListeners(String key, Collection<T> listeners, boolean persistListener) {
+    if (persistListener) {
+      removeFromListenerMap(key, listeners, _persistentListener);
+    } else {
+      removeFromListenerMap(key, listeners, _onetimeListener);
+    }
+  }
+
+  /**
+   * Consume the listeners registered to the given key. The given consumer is applied to all one-time and persistent
+   * listeners with the given key.
+   * The one-time listeners are removed after this operation, as a result, this operation is NOT idempotent.
+   * @param key the key of the listeners
+   * @param consumer the consuming action for the listeners
+   */
+  public void consumeListeners(String key, Consumer<? super T> consumer) {
+    Set<T> onetimeListeners;
+    Set<T> persistentListeners;
+    synchronized (this) {
+      // remove one-time listeners
+      onetimeListeners = _onetimeListener.remove(key);
+    }
+    if (onetimeListeners == null) {

Review Comment:
   I feel that from line 106 to line 114 can be simplified, such that only if it's non-null then wen add it to the result set, and there's no need of instantiating empty set when there's no listener registered. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] qqu0127 closed pull request #2338: Implement a thread-safe listener container for zkclient

Posted by "qqu0127 (via GitHub)" <gi...@apache.org>.
qqu0127 closed pull request #2338: Implement a thread-safe listener container for zkclient
URL: https://github.com/apache/helix/pull/2338


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] qqu0127 commented on a diff in pull request #2338: Implement a thread-safe listener container for zkclient

Posted by GitBox <gi...@apache.org>.
qqu0127 commented on code in PR #2338:
URL: https://github.com/apache/helix/pull/2338#discussion_r1069889583


##########
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ListenerContainer.java:
##########
@@ -0,0 +1,186 @@
+package org.apache.helix.zookeeper.zkclient;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+
+
+/**
+ * A thread-safe container wrapper class for Listeners registered to ZkClient.
+ * It stores 2 types of listener separately, one-time listener and persistent listener. The former ones are removed
+ * right after its consumption, while the latter need to be explicitly removed. 
+ * @param <T> the type of listener
+ */
+public class ListenerContainer<T> {
+  private final ReentrantLock _lock = new ReentrantLock(true);
+  private final Map<String, Set<T>> _persistentListener = new ConcurrentHashMap<>();
+  private final Map<String, Set<T>> _onetimeListener = new ConcurrentHashMap<>();
+
+  /**
+   * Add listener to the container with specified key.
+   * @param key the key to register to
+   * @param listener the listener to register
+   * @param persistListener true if the listener is persistent
+   */
+  public void addListener(String key, T listener, boolean persistListener) {
+    addListener(key, listener, persistListener ? _persistentListener : _onetimeListener);
+  }
+
+  /**
+   * Remove listener from the container.
+   * This operation removes both one-time and persistent listener from the specified key.
+   * @param key the key to remove
+   * @param listener the listener to remove
+   */
+  public void removeListener(String key, T listener) {
+    _lock.lock();
+    try {
+      removeFromListenerMap(key, listener, _persistentListener);
+      removeFromListenerMap(key, listener, _onetimeListener);
+    } finally {
+      _lock.unlock();
+    }
+  }
+
+  /**
+   * Remove listener from the container.
+   * @param key the key to remove
+   * @param listener the listener to remove
+   * @param persistListener true if remove the persistent listener, otherwise remove the one-time listener
+   */
+  public void removeListener(String key, T listener, boolean persistListener) {

Review Comment:
   The method you described is already there, it removes from both entries. This one is implemented considering some clients might want to remove only one type of listener when both types are registered.
   But I think what you said makes sense. There is very little a container can do other than hoping the clients be good citizens.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] qqu0127 commented on a diff in pull request #2338: Implement a thread-safe listener container for zkclient

Posted by GitBox <gi...@apache.org>.
qqu0127 commented on code in PR #2338:
URL: https://github.com/apache/helix/pull/2338#discussion_r1068396351


##########
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ListenerContainer.java:
##########
@@ -0,0 +1,186 @@
+package org.apache.helix.zookeeper.zkclient;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+
+
+/**
+ * A thread-safe container wrapper class for Listeners registered to ZkClient.
+ * It stores 2 types of listener separately, one-time listener and persistent listener. The former ones are removed

Review Comment:
   No one is using it at the moment. I get the idea we can skip the certain impl under time pressure.
   This is implemented to be compatible with the API. We chatted with Junkai offline, I think we can just keep the code and not use this class. No existing class is using it, while I think it's still valuable in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] qqu0127 commented on a diff in pull request #2338: Implement a thread-safe listener container for zkclient

Posted by GitBox <gi...@apache.org>.
qqu0127 commented on code in PR #2338:
URL: https://github.com/apache/helix/pull/2338#discussion_r1069643019


##########
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ListenerContainer.java:
##########
@@ -0,0 +1,186 @@
+package org.apache.helix.zookeeper.zkclient;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+
+
+/**
+ * A thread-safe container wrapper class for Listeners registered to ZkClient.
+ * It stores 2 types of listener separately, one-time listener and persistent listener. The former ones are removed
+ * right after its consumption, while the latter need to be explicitly removed. 
+ * @param <T> the type of listener
+ */
+public class ListenerContainer<T> {
+  private final ReentrantLock _lock = new ReentrantLock(true);
+  private final Map<String, Set<T>> _persistentListener = new ConcurrentHashMap<>();
+  private final Map<String, Set<T>> _onetimeListener = new ConcurrentHashMap<>();
+
+  /**
+   * Add listener to the container with specified key.
+   * @param key the key to register to
+   * @param listener the listener to register
+   * @param persistListener true if the listener is persistent
+   */
+  public void addListener(String key, T listener, boolean persistListener) {
+    addListener(key, listener, persistListener ? _persistentListener : _onetimeListener);
+  }
+
+  /**
+   * Remove listener from the container.
+   * This operation removes both one-time and persistent listener from the specified key.
+   * @param key the key to remove
+   * @param listener the listener to remove
+   */
+  public void removeListener(String key, T listener) {
+    _lock.lock();
+    try {
+      removeFromListenerMap(key, listener, _persistentListener);

Review Comment:
   Thanks for the review @jiajunwang !
   For your first point, this class is generics, it can support any type of listener already.
   Second, sync block might be better compared to the whole method, that should be equivalent. For locking it self, we need synchronization on most writes operation even if there are concurrent hashmap. Please see other comment for more details.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] qqu0127 commented on a diff in pull request #2338: Implement a thread-safe listener container for zkclient

Posted by GitBox <gi...@apache.org>.
qqu0127 commented on code in PR #2338:
URL: https://github.com/apache/helix/pull/2338#discussion_r1069638754


##########
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ListenerContainer.java:
##########
@@ -0,0 +1,186 @@
+package org.apache.helix.zookeeper.zkclient;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+
+
+/**
+ * A thread-safe container wrapper class for Listeners registered to ZkClient.
+ * It stores 2 types of listener separately, one-time listener and persistent listener. The former ones are removed
+ * right after its consumption, while the latter need to be explicitly removed. 
+ * @param <T> the type of listener
+ */
+public class ListenerContainer<T> {
+  private final ReentrantLock _lock = new ReentrantLock(true);
+  private final Map<String, Set<T>> _persistentListener = new ConcurrentHashMap<>();
+  private final Map<String, Set<T>> _onetimeListener = new ConcurrentHashMap<>();
+
+  /**
+   * Add listener to the container with specified key.
+   * @param key the key to register to
+   * @param listener the listener to register
+   * @param persistListener true if the listener is persistent
+   */
+  public void addListener(String key, T listener, boolean persistListener) {
+    addListener(key, listener, persistListener ? _persistentListener : _onetimeListener);
+  }
+
+  /**
+   * Remove listener from the container.
+   * This operation removes both one-time and persistent listener from the specified key.
+   * @param key the key to remove
+   * @param listener the listener to remove
+   */
+  public void removeListener(String key, T listener) {
+    _lock.lock();
+    try {
+      removeFromListenerMap(key, listener, _persistentListener);
+      removeFromListenerMap(key, listener, _onetimeListener);
+    } finally {
+      _lock.unlock();
+    }
+  }
+
+  /**
+   * Remove listener from the container.
+   * @param key the key to remove
+   * @param listener the listener to remove
+   * @param persistListener true if remove the persistent listener, otherwise remove the one-time listener
+   */
+  public void removeListener(String key, T listener, boolean persistListener) {

Review Comment:
   Well, the class itself is generic, so it has the type information at least at compile time. And this method is to remove the specified listener from the container. It's not passing the type around (if you may look at the signature again.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org