You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by xy...@apache.org on 2023/04/06 06:11:27 UTC
[helix] branch metaclient updated: Refactor ZkClient for persist watch (#2426)
This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch metaclient
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/metaclient by this push:
new 3ddd4894f Refactor ZkClient for persist watch (#2426)
3ddd4894f is described below
commit 3ddd4894f6cba28cf10af0a528e108646b990f15
Author: xyuanlu <xy...@gmail.com>
AuthorDate: Wed Apr 5 23:11:22 2023 -0700
Refactor ZkClient for persist watch (#2426)
Refactor ZkClient for persist watch
---
.../helix/zookeeper/zkclient/IZkConnection.java | 5 ++
.../apache/helix/zookeeper/zkclient/ZkClient.java | 92 +++++++++++++---------
.../helix/zookeeper/zkclient/ZkConnection.java | 13 +++
3 files changed, 73 insertions(+), 37 deletions(-)
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/IZkConnection.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/IZkConnection.java
index e766bf7d9..bd94432b4 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/IZkConnection.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/IZkConnection.java
@@ -21,6 +21,7 @@ package org.apache.helix.zookeeper.zkclient;
import java.util.List;
+import org.apache.zookeeper.AddWatchMode;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Op;
@@ -63,4 +64,8 @@ public interface IZkConnection {
public List<OpResult> multi(Iterable<Op> ops) throws KeeperException, InterruptedException;
public void addAuthInfo(String scheme, byte[] auth);
+
+ public void addWatch(String basePath, Watcher watcher, AddWatchMode mode) throws KeeperException,
+ InterruptedException;
+ public void removeWatches(String path, Watcher watcher, Watcher.WatcherType watcherType) throws InterruptedException, KeeperException;
}
\ No newline at end of file
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
index f87edda2b..dbb73864b 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
@@ -76,6 +76,7 @@ import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
/**
* "Native ZkClient": not to be used directly.
*
@@ -259,14 +260,9 @@ public class ZkClient implements Watcher {
}
public ChildrenSubscribeResult subscribeChildChanges(String path, IZkChildListener listener, boolean skipWatchingNonExistNode) {
- synchronized (_childListener) {
- Set<IZkChildListener> listeners = _childListener.get(path);
- if (listeners == null) {
- listeners = new CopyOnWriteArraySet<>();
- _childListener.put(path, listeners);
+ synchronized (_childListener) {
+ addChildListener(path, listener);
}
- listeners.add(listener);
- }
List<String> children = watchForChilds(path, skipWatchingNonExistNode);
if (children == null && skipWatchingNonExistNode) {
@@ -279,32 +275,15 @@ public class ZkClient implements Watcher {
}
public void unsubscribeChildChanges(String path, IZkChildListener childListener) {
- synchronized (_childListener) {
- final Set<IZkChildListener> listeners = _childListener.get(path);
- if (listeners != null) {
- listeners.remove(childListener);
- }
+ synchronized (_childListener) {
+ removeChildListener(path, childListener);
}
}
public boolean subscribeDataChanges(String path, IZkDataListener listener, boolean skipWatchingNonExistNode) {
- Set<IZkDataListenerEntry> listenerEntries;
- synchronized (_dataListener) {
- listenerEntries = _dataListener.get(path);
- if (listenerEntries == null) {
- listenerEntries = new CopyOnWriteArraySet<>();
- _dataListener.put(path, listenerEntries);
- }
- boolean prefetchEnabled = isPrefetchEnabled(listener);
- IZkDataListenerEntry listenerEntry = new IZkDataListenerEntry(listener, prefetchEnabled);
- listenerEntries.add(listenerEntry);
- if (prefetchEnabled) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("zkclient {} subscribed data changes for {}, listener {}, prefetch data {}",
- _uid, path, listener, prefetchEnabled);
- }
- }
+ synchronized (_dataListener) {
+ addDataListener(path, listener);
}
boolean watchInstalled = watchForData(path, skipWatchingNonExistNode);
@@ -355,16 +334,9 @@ public class ZkClient implements Watcher {
}
public void unsubscribeDataChanges(String path, IZkDataListener dataListener) {
- synchronized (_dataListener) {
- final Set<IZkDataListenerEntry> listeners = _dataListener.get(path);
- if (listeners != null) {
- IZkDataListenerEntry listenerEntry = new IZkDataListenerEntry(dataListener);
- listeners.remove(listenerEntry);
+ synchronized (_dataListener) {
+ removeDataListener(path, dataListener);
}
- if (listeners == null || listeners.isEmpty()) {
- _dataListener.remove(path);
- }
- }
}
public void subscribeStateChanges(final IZkStateListener listener) {
@@ -2907,4 +2879,50 @@ public class ZkClient implements Watcher {
+ " is greater than ZkClient size limit " + WRITE_SIZE_LIMIT);
}
}
+
+ private void addDataListener(String path, IZkDataListener listener) {
+ Set<IZkDataListenerEntry> listenerEntries;
+ listenerEntries = _dataListener.get(path);
+ if (listenerEntries == null) {
+ listenerEntries = new CopyOnWriteArraySet<>();
+ _dataListener.put(path, listenerEntries);
+ }
+
+ boolean prefetchEnabled = isPrefetchEnabled(listener);
+ IZkDataListenerEntry listenerEntry = new IZkDataListenerEntry(listener, prefetchEnabled);
+ listenerEntries.add(listenerEntry);
+ if (prefetchEnabled) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("zkclient {} subscribed data changes for {}, listener {}, prefetch data {}", _uid,
+ path, listener, prefetchEnabled);
+ }
+ }
+ }
+
+ private void removeDataListener(String path, IZkDataListener dataListener) {
+ final Set<IZkDataListenerEntry> listeners = _dataListener.get(path);
+ if (listeners != null) {
+ IZkDataListenerEntry listenerEntry = new IZkDataListenerEntry(dataListener);
+ listeners.remove(listenerEntry);
+ }
+ if (listeners == null || listeners.isEmpty()) {
+ _dataListener.remove(path);
+ }
+ }
+
+ private void addChildListener(String path, IZkChildListener listener) {
+ Set<IZkChildListener> listeners = _childListener.get(path);
+ if (listeners == null) {
+ listeners = new CopyOnWriteArraySet<>();
+ _childListener.put(path, listeners);
+ }
+ listeners.add(listener);
+ }
+
+ private void removeChildListener(String path, IZkChildListener listener) {
+ final Set<IZkChildListener> listeners = _childListener.get(path);
+ if (listeners != null) {
+ listeners.remove(listener);
+ }
+ }
}
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkConnection.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkConnection.java
index 01935919c..2f7ac27de 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkConnection.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkConnection.java
@@ -29,6 +29,7 @@ import java.util.concurrent.locks.ReentrantLock;
import com.google.common.annotations.VisibleForTesting;
import org.apache.helix.zookeeper.constant.ZkSystemPropertyKeys;
import org.apache.helix.zookeeper.zkclient.exception.ZkException;
+import org.apache.zookeeper.AddWatchMode;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Op;
@@ -252,6 +253,18 @@ public class ZkConnection implements IZkConnection {
_getChildrenMethod.getName());
}
+ @Override
+ public void addWatch(String basePath, Watcher watcher, AddWatchMode mode)
+ throws KeeperException, InterruptedException {
+ _zk.addWatch(basePath, watcher, mode);
+ }
+
+ @Override
+ public void removeWatches(String path, Watcher watcher, Watcher.WatcherType watcherType)
+ throws InterruptedException, KeeperException {
+ _zk.removeWatches(path, watcher, watcherType, true);
+ }
+
private Method doLookUpGetChildrenMethod() {
if (!GETCHILDREN_PAGINATION_DISABLED) {
try {