You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by xy...@apache.org on 2023/04/27 20:36:42 UTC

[helix] 33/37: Add an option in metaclient to use persist watcher (#2434)

This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch metaclient
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 1e0914aaa55c0c697e790385888564bbb6fe876e
Author: xyuanlu <xy...@gmail.com>
AuthorDate: Mon Apr 10 09:09:39 2023 -0700

    Add an option in metaclient to use persist watcher (#2434)
---
 .../helix/metaclient/impl/zk/ZkMetaClient.java      |  2 +-
 .../helix/zookeeper/impl/client/ZkClient.java       | 21 ++++++++++++++-------
 .../apache/helix/zookeeper/zkclient/ZkClient.java   |  6 ++++--
 3 files changed, 19 insertions(+), 10 deletions(-)

diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
index 84c329fe4..af31423d0 100644
--- a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
@@ -91,7 +91,7 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable {
         new ZkConnection(config.getConnectionAddress(), (int) config.getSessionTimeoutInMillis()),
         (int) _initConnectionTimeout, _reconnectTimeout /*use reconnect timeout for retry timeout*/,
         config.getZkSerializer(), config.getMonitorType(), config.getMonitorKey(),
-        config.getMonitorInstanceName(), config.getMonitorRootPathOnly(), false);
+        config.getMonitorInstanceName(), config.getMonitorRootPathOnly(), false, true);
     _zkClientReconnectMonitor = Executors.newSingleThreadScheduledExecutor();
     _reconnectStateChangeListener = new ReconnectStateChangeListener();
   }
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/ZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/ZkClient.java
index f3742b55b..1dd601c21 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/ZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/ZkClient.java
@@ -91,19 +91,19 @@ public class ZkClient extends org.apache.helix.zookeeper.zkclient.ZkClient imple
    *                      explicitly before talking to ZK.
    */
   public ZkClient(IZkConnection zkConnection, int connectionTimeout, long operationRetryTimeout,
-      PathBasedZkSerializer zkSerializer,
-      String monitorType, String monitorKey, String monitorInstanceName,
-      boolean monitorRootPathOnly, boolean connectOnInit) {
+      PathBasedZkSerializer zkSerializer, String monitorType, String monitorKey,
+      String monitorInstanceName, boolean monitorRootPathOnly, boolean connectOnInit,
+      boolean usePersistWatcher) {
     super(zkConnection, connectionTimeout, operationRetryTimeout, zkSerializer, monitorType,
-        monitorKey, monitorInstanceName, monitorRootPathOnly, connectOnInit);
+        monitorKey, monitorInstanceName, monitorRootPathOnly, connectOnInit, usePersistWatcher);
   }
 
   public ZkClient(IZkConnection zkConnection, int connectionTimeout, long operationRetryTimeout,
       PathBasedZkSerializer zkSerializer,
       String monitorType, String monitorKey, String monitorInstanceName,
       boolean monitorRootPathOnly) {
-    this(zkConnection, connectionTimeout, operationRetryTimeout, zkSerializer, monitorType, monitorKey,
-        monitorInstanceName, monitorRootPathOnly, true);
+    this(zkConnection, connectionTimeout, operationRetryTimeout, zkSerializer, monitorType,
+        monitorKey, monitorInstanceName, monitorRootPathOnly, true, false);
   }
 
   public ZkClient(IZkConnection connection, int connectionTimeout,
@@ -200,6 +200,7 @@ public class ZkClient extends org.apache.helix.zookeeper.zkclient.ZkClient imple
     String _monitorInstanceName = null;
     boolean _monitorRootPathOnly = true;
     boolean _connectOnInit = true;
+    boolean _usePersistWatcher = false;
 
     /**
      * If set true, the client will connect to ZK during initialization.
@@ -278,6 +279,11 @@ public class ZkClient extends org.apache.helix.zookeeper.zkclient.ZkClient imple
       return this;
     }
 
+    public Builder setUsePersistWatcher(boolean usePersistWatcher) {
+      this._usePersistWatcher = usePersistWatcher;
+      return this;
+    }
+
     public ZkClient build() {
       if (_connection == null) {
         if (_zkServer == null) {
@@ -293,7 +299,8 @@ public class ZkClient extends org.apache.helix.zookeeper.zkclient.ZkClient imple
       }
 
       return new ZkClient(_connection, _connectionTimeout, _operationRetryTimeout, _zkSerializer,
-          _monitorType, _monitorKey, _monitorInstanceName, _monitorRootPathOnly, _connectOnInit);
+          _monitorType, _monitorKey, _monitorInstanceName, _monitorRootPathOnly, _connectOnInit,
+          _usePersistWatcher);
     }
   }
 }
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
index dbb73864b..a8fb3ede2 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
@@ -133,6 +133,7 @@ public class ZkClient implements Watcher {
   private volatile boolean _closed;
   private PathBasedZkSerializer _pathBasedZkSerializer;
   private ZkClientMonitor _monitor;
+  private boolean _usePersistWatcher;
 
   // To automatically retry the async operation, we need a separate thread other than the
   // ZkEventThread. Otherwise the retry request might block the normal event processing.
@@ -216,7 +217,7 @@ public class ZkClient implements Watcher {
 
   protected ZkClient(IZkConnection zkConnection, int connectionTimeout, long operationRetryTimeout,
       PathBasedZkSerializer zkSerializer, String monitorType, String monitorKey,
-      String monitorInstanceName, boolean monitorRootPathOnly, boolean connectOnInit) {
+      String monitorInstanceName, boolean monitorRootPathOnly, boolean connectOnInit, boolean usePersistWatcher) {
     if (zkConnection == null) {
       throw new NullPointerException("Zookeeper connection is null!");
     }
@@ -245,13 +246,14 @@ public class ZkClient implements Watcher {
     if (connectOnInit) {
       connect(connectionTimeout, this);
     }
+    _usePersistWatcher = usePersistWatcher;
   }
 
   protected ZkClient(IZkConnection zkConnection, int connectionTimeout, long operationRetryTimeout,
       PathBasedZkSerializer zkSerializer, String monitorType, String monitorKey,
       String monitorInstanceName, boolean monitorRootPathOnly) {
     this(zkConnection, connectionTimeout, operationRetryTimeout, zkSerializer, monitorType, monitorKey,
-        monitorInstanceName, monitorRootPathOnly, true);
+        monitorInstanceName, monitorRootPathOnly, true, false);
   }
 
   public List<String> subscribeChildChanges(String path, IZkChildListener listener) {