You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by xy...@apache.org on 2023/04/27 20:36:42 UTC
[helix] 33/37: Add an option in metaclient to use persist watcher (#2434)
This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch metaclient
in repository https://gitbox.apache.org/repos/asf/helix.git
commit 1e0914aaa55c0c697e790385888564bbb6fe876e
Author: xyuanlu <xy...@gmail.com>
AuthorDate: Mon Apr 10 09:09:39 2023 -0700
Add an option in metaclient to use persist watcher (#2434)
---
.../helix/metaclient/impl/zk/ZkMetaClient.java | 2 +-
.../helix/zookeeper/impl/client/ZkClient.java | 21 ++++++++++++++-------
.../apache/helix/zookeeper/zkclient/ZkClient.java | 6 ++++--
3 files changed, 19 insertions(+), 10 deletions(-)
diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
index 84c329fe4..af31423d0 100644
--- a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
@@ -91,7 +91,7 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable {
new ZkConnection(config.getConnectionAddress(), (int) config.getSessionTimeoutInMillis()),
(int) _initConnectionTimeout, _reconnectTimeout /*use reconnect timeout for retry timeout*/,
config.getZkSerializer(), config.getMonitorType(), config.getMonitorKey(),
- config.getMonitorInstanceName(), config.getMonitorRootPathOnly(), false);
+ config.getMonitorInstanceName(), config.getMonitorRootPathOnly(), false, true);
_zkClientReconnectMonitor = Executors.newSingleThreadScheduledExecutor();
_reconnectStateChangeListener = new ReconnectStateChangeListener();
}
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/ZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/ZkClient.java
index f3742b55b..1dd601c21 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/ZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/ZkClient.java
@@ -91,19 +91,19 @@ public class ZkClient extends org.apache.helix.zookeeper.zkclient.ZkClient imple
* explicitly before talking to ZK.
*/
public ZkClient(IZkConnection zkConnection, int connectionTimeout, long operationRetryTimeout,
- PathBasedZkSerializer zkSerializer,
- String monitorType, String monitorKey, String monitorInstanceName,
- boolean monitorRootPathOnly, boolean connectOnInit) {
+ PathBasedZkSerializer zkSerializer, String monitorType, String monitorKey,
+ String monitorInstanceName, boolean monitorRootPathOnly, boolean connectOnInit,
+ boolean usePersistWatcher) {
super(zkConnection, connectionTimeout, operationRetryTimeout, zkSerializer, monitorType,
- monitorKey, monitorInstanceName, monitorRootPathOnly, connectOnInit);
+ monitorKey, monitorInstanceName, monitorRootPathOnly, connectOnInit, usePersistWatcher);
}
public ZkClient(IZkConnection zkConnection, int connectionTimeout, long operationRetryTimeout,
PathBasedZkSerializer zkSerializer,
String monitorType, String monitorKey, String monitorInstanceName,
boolean monitorRootPathOnly) {
- this(zkConnection, connectionTimeout, operationRetryTimeout, zkSerializer, monitorType, monitorKey,
- monitorInstanceName, monitorRootPathOnly, true);
+ this(zkConnection, connectionTimeout, operationRetryTimeout, zkSerializer, monitorType,
+ monitorKey, monitorInstanceName, monitorRootPathOnly, true, false);
}
public ZkClient(IZkConnection connection, int connectionTimeout,
@@ -200,6 +200,7 @@ public class ZkClient extends org.apache.helix.zookeeper.zkclient.ZkClient imple
String _monitorInstanceName = null;
boolean _monitorRootPathOnly = true;
boolean _connectOnInit = true;
+ boolean _usePersistWatcher = false;
/**
* If set true, the client will connect to ZK during initialization.
@@ -278,6 +279,11 @@ public class ZkClient extends org.apache.helix.zookeeper.zkclient.ZkClient imple
return this;
}
+ public Builder setUsePersistWatcher(boolean usePersistWatcher) {
+ this._usePersistWatcher = usePersistWatcher;
+ return this;
+ }
+
public ZkClient build() {
if (_connection == null) {
if (_zkServer == null) {
@@ -293,7 +299,8 @@ public class ZkClient extends org.apache.helix.zookeeper.zkclient.ZkClient imple
}
return new ZkClient(_connection, _connectionTimeout, _operationRetryTimeout, _zkSerializer,
- _monitorType, _monitorKey, _monitorInstanceName, _monitorRootPathOnly, _connectOnInit);
+ _monitorType, _monitorKey, _monitorInstanceName, _monitorRootPathOnly, _connectOnInit,
+ _usePersistWatcher);
}
}
}
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
index dbb73864b..a8fb3ede2 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
@@ -133,6 +133,7 @@ public class ZkClient implements Watcher {
private volatile boolean _closed;
private PathBasedZkSerializer _pathBasedZkSerializer;
private ZkClientMonitor _monitor;
+ private boolean _usePersistWatcher;
// To automatically retry the async operation, we need a separate thread other than the
// ZkEventThread. Otherwise the retry request might block the normal event processing.
@@ -216,7 +217,7 @@ public class ZkClient implements Watcher {
protected ZkClient(IZkConnection zkConnection, int connectionTimeout, long operationRetryTimeout,
PathBasedZkSerializer zkSerializer, String monitorType, String monitorKey,
- String monitorInstanceName, boolean monitorRootPathOnly, boolean connectOnInit) {
+ String monitorInstanceName, boolean monitorRootPathOnly, boolean connectOnInit, boolean usePersistWatcher) {
if (zkConnection == null) {
throw new NullPointerException("Zookeeper connection is null!");
}
@@ -245,13 +246,14 @@ public class ZkClient implements Watcher {
if (connectOnInit) {
connect(connectionTimeout, this);
}
+ _usePersistWatcher = usePersistWatcher;
}
protected ZkClient(IZkConnection zkConnection, int connectionTimeout, long operationRetryTimeout,
PathBasedZkSerializer zkSerializer, String monitorType, String monitorKey,
String monitorInstanceName, boolean monitorRootPathOnly) {
this(zkConnection, connectionTimeout, operationRetryTimeout, zkSerializer, monitorType, monitorKey,
- monitorInstanceName, monitorRootPathOnly, true);
+ monitorInstanceName, monitorRootPathOnly, true, false);
}
public List<String> subscribeChildChanges(String path, IZkChildListener listener) {