You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by xy...@apache.org on 2023/04/06 06:11:27 UTC

[helix] branch metaclient updated: Refactor ZkClient for persist watch (#2426)

This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch metaclient
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/metaclient by this push:
     new 3ddd4894f Refactor ZkClient for persist watch  (#2426)
3ddd4894f is described below

commit 3ddd4894f6cba28cf10af0a528e108646b990f15
Author: xyuanlu <xy...@gmail.com>
AuthorDate: Wed Apr 5 23:11:22 2023 -0700

    Refactor ZkClient for persist watch  (#2426)
    
    Refactor ZkClient for persist watch
---
 .../helix/zookeeper/zkclient/IZkConnection.java    |  5 ++
 .../apache/helix/zookeeper/zkclient/ZkClient.java  | 92 +++++++++++++---------
 .../helix/zookeeper/zkclient/ZkConnection.java     | 13 +++
 3 files changed, 73 insertions(+), 37 deletions(-)

diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/IZkConnection.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/IZkConnection.java
index e766bf7d9..bd94432b4 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/IZkConnection.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/IZkConnection.java
@@ -21,6 +21,7 @@ package org.apache.helix.zookeeper.zkclient;
 
 import java.util.List;
 
+import org.apache.zookeeper.AddWatchMode;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.Op;
@@ -63,4 +64,8 @@ public interface IZkConnection {
     public List<OpResult> multi(Iterable<Op> ops) throws KeeperException, InterruptedException;
 
     public void addAuthInfo(String scheme, byte[] auth);
+
+    public void addWatch(String basePath, Watcher watcher, AddWatchMode mode) throws KeeperException,
+                                                                                     InterruptedException;
+    public void removeWatches(String path, Watcher watcher, Watcher.WatcherType watcherType) throws InterruptedException, KeeperException;
 }
\ No newline at end of file
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
index f87edda2b..dbb73864b 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
@@ -76,6 +76,7 @@ import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 /**
  * "Native ZkClient": not to be used directly.
  *
@@ -259,14 +260,9 @@ public class ZkClient implements Watcher {
   }
 
   public ChildrenSubscribeResult subscribeChildChanges(String path, IZkChildListener listener, boolean skipWatchingNonExistNode) {
-    synchronized (_childListener) {
-      Set<IZkChildListener> listeners = _childListener.get(path);
-      if (listeners == null) {
-        listeners = new CopyOnWriteArraySet<>();
-        _childListener.put(path, listeners);
+      synchronized (_childListener) {
+        addChildListener(path, listener);
       }
-      listeners.add(listener);
-    }
 
     List<String> children = watchForChilds(path, skipWatchingNonExistNode);
     if (children == null && skipWatchingNonExistNode) {
@@ -279,32 +275,15 @@ public class ZkClient implements Watcher {
   }
 
   public void unsubscribeChildChanges(String path, IZkChildListener childListener) {
-    synchronized (_childListener) {
-      final Set<IZkChildListener> listeners = _childListener.get(path);
-      if (listeners != null) {
-        listeners.remove(childListener);
-      }
+      synchronized (_childListener) {
+        removeChildListener(path, childListener);
     }
   }
 
   public boolean subscribeDataChanges(String path, IZkDataListener listener, boolean skipWatchingNonExistNode) {
-    Set<IZkDataListenerEntry> listenerEntries;
-    synchronized (_dataListener) {
-      listenerEntries = _dataListener.get(path);
-      if (listenerEntries == null) {
-        listenerEntries = new CopyOnWriteArraySet<>();
-        _dataListener.put(path, listenerEntries);
-      }
 
-      boolean prefetchEnabled = isPrefetchEnabled(listener);
-      IZkDataListenerEntry listenerEntry = new IZkDataListenerEntry(listener, prefetchEnabled);
-      listenerEntries.add(listenerEntry);
-      if (prefetchEnabled) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("zkclient {} subscribed data changes for {}, listener {}, prefetch data {}",
-              _uid, path, listener, prefetchEnabled);
-        }
-      }
+      synchronized (_dataListener) {
+        addDataListener(path, listener);
     }
 
     boolean watchInstalled = watchForData(path, skipWatchingNonExistNode);
@@ -355,16 +334,9 @@ public class ZkClient implements Watcher {
   }
 
   public void unsubscribeDataChanges(String path, IZkDataListener dataListener) {
-    synchronized (_dataListener) {
-      final Set<IZkDataListenerEntry> listeners = _dataListener.get(path);
-      if (listeners != null) {
-        IZkDataListenerEntry listenerEntry = new IZkDataListenerEntry(dataListener);
-        listeners.remove(listenerEntry);
+      synchronized (_dataListener) {
+        removeDataListener(path, dataListener);
       }
-      if (listeners == null || listeners.isEmpty()) {
-        _dataListener.remove(path);
-      }
-    }
   }
 
   public void subscribeStateChanges(final IZkStateListener listener) {
@@ -2907,4 +2879,50 @@ public class ZkClient implements Watcher {
           + " is greater than ZkClient size limit " + WRITE_SIZE_LIMIT);
     }
   }
+
+  private void addDataListener(String path, IZkDataListener listener) {
+    Set<IZkDataListenerEntry> listenerEntries;
+    listenerEntries = _dataListener.get(path);
+    if (listenerEntries == null) {
+      listenerEntries = new CopyOnWriteArraySet<>();
+      _dataListener.put(path, listenerEntries);
+    }
+
+    boolean prefetchEnabled = isPrefetchEnabled(listener);
+    IZkDataListenerEntry listenerEntry = new IZkDataListenerEntry(listener, prefetchEnabled);
+    listenerEntries.add(listenerEntry);
+    if (prefetchEnabled) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("zkclient {} subscribed data changes for {}, listener {}, prefetch data {}", _uid,
+            path, listener, prefetchEnabled);
+      }
+    }
+  }
+
+  private void removeDataListener(String path, IZkDataListener dataListener) {
+    final Set<IZkDataListenerEntry> listeners = _dataListener.get(path);
+    if (listeners != null) {
+      IZkDataListenerEntry listenerEntry = new IZkDataListenerEntry(dataListener);
+      listeners.remove(listenerEntry);
+    }
+    if (listeners == null || listeners.isEmpty()) {
+      _dataListener.remove(path);
+    }
+  }
+
+  private void addChildListener(String path, IZkChildListener listener) {
+    Set<IZkChildListener> listeners = _childListener.get(path);
+    if (listeners == null) {
+      listeners = new CopyOnWriteArraySet<>();
+      _childListener.put(path, listeners);
+    }
+    listeners.add(listener);
+  }
+
+  private void removeChildListener(String path, IZkChildListener listener) {
+    final Set<IZkChildListener> listeners = _childListener.get(path);
+    if (listeners != null) {
+      listeners.remove(listener);
+    }
+  }
 }
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkConnection.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkConnection.java
index 01935919c..2f7ac27de 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkConnection.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkConnection.java
@@ -29,6 +29,7 @@ import java.util.concurrent.locks.ReentrantLock;
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.helix.zookeeper.constant.ZkSystemPropertyKeys;
 import org.apache.helix.zookeeper.zkclient.exception.ZkException;
+import org.apache.zookeeper.AddWatchMode;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.Op;
@@ -252,6 +253,18 @@ public class ZkConnection implements IZkConnection {
         _getChildrenMethod.getName());
   }
 
+  @Override
+  public void addWatch(String basePath, Watcher watcher, AddWatchMode mode)
+      throws KeeperException, InterruptedException {
+    _zk.addWatch(basePath, watcher, mode);
+  }
+
+  @Override
+  public void removeWatches(String path, Watcher watcher, Watcher.WatcherType watcherType)
+      throws InterruptedException, KeeperException {
+    _zk.removeWatches(path, watcher, watcherType, true);
+  }
+
   private Method doLookUpGetChildrenMethod() {
     if (!GETCHILDREN_PAGINATION_DISABLED) {
       try {