You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by xy...@apache.org on 2023/01/09 17:57:00 UTC
[helix] branch metaclient updated: New features and improvement in zookeeper-api to prepare meta-client implementation (#2333)
This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch metaclient
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/metaclient by this push:
new 9bda33d4e New features and improvement in zookeeper-api to prepare meta-client implementation (#2333)
9bda33d4e is described below
commit 9bda33d4eeb5f1dac83b7b77a69a83c3ecfda0ed
Author: Qi (Quincy) Qu <qq...@linkedin.com>
AuthorDate: Mon Jan 9 12:56:54 2023 -0500
New features and improvement in zookeeper-api to prepare meta-client implementation (#2333)
Prepare zkclient for meta-client DataChangeListener add new method in IZkDataListener
Implement boolean flag connectOnInit in zkclient for decoupling
Override equals and hashcode for the converter class
---
.github/workflows/Helix-PR-CI.yml | 2 +-
.../helix/metaclient/impl/zk/ZkMetaClient.java | 54 ++++++++++++++++++++++
.../helix/zookeeper/impl/client/ZkClient.java | 26 +++++++++--
.../helix/zookeeper/zkclient/IZkDataListener.java | 7 +++
.../apache/helix/zookeeper/zkclient/ZkClient.java | 43 +++++++++++------
.../zookeeper/zkclient/metric/ZkClientMonitor.java | 6 +++
6 files changed, 119 insertions(+), 19 deletions(-)
diff --git a/.github/workflows/Helix-PR-CI.yml b/.github/workflows/Helix-PR-CI.yml
index 7ad3c89fe..ddb56fe47 100644
--- a/.github/workflows/Helix-PR-CI.yml
+++ b/.github/workflows/Helix-PR-CI.yml
@@ -1,7 +1,7 @@
name: Helix PR CI
on:
pull_request:
- branches: [ master ]
+ branches: [ master, metaclient ] # TODO: remove side branch
paths-ignore:
- '.github/**'
- 'helix-front/**'
diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
index 194cf18f9..b3270ad81 100644
--- a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
@@ -33,7 +33,9 @@ import org.apache.helix.metaclient.api.MetaClientInterface;
import org.apache.helix.metaclient.api.OpResult;
import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
import org.apache.helix.zookeeper.impl.client.ZkClient;
+import org.apache.helix.zookeeper.zkclient.IZkDataListener;
import org.apache.helix.zookeeper.zkclient.ZkConnection;
+import org.apache.zookeeper.Watcher;
public class ZkMetaClient implements MetaClientInterface {
@@ -247,4 +249,56 @@ public class ZkMetaClient implements MetaClientInterface {
public List<OpResult> transactionOP(Iterable iterable) {
return null;
}
+
+ /**
+ * A converter class to transform {@link DataChangeListener} to {@link IZkDataListener}
+ */
+ static class DataListenerConverter implements IZkDataListener {
+ private final DataChangeListener _listener;
+
+ DataListenerConverter(DataChangeListener listener) {
+ _listener = listener;
+ }
+
+ private DataChangeListener.ChangeType convertType(Watcher.Event.EventType eventType) {
+ switch (eventType) {
+ case NodeCreated: return DataChangeListener.ChangeType.ENTRY_CREATED;
+ case NodeDataChanged: return DataChangeListener.ChangeType.ENTRY_UPDATE;
+ case NodeDeleted: return DataChangeListener.ChangeType.ENTRY_DELETED;
+ default: throw new IllegalArgumentException("EventType " + eventType + " is not supported.");
+ }
+ }
+
+ @Override
+ public void handleDataChange(String dataPath, Object data) throws Exception {
+ throw new UnsupportedOperationException("handleDataChange(String dataPath, Object data) is not supported.");
+ }
+
+ @Override
+ public void handleDataDeleted(String dataPath) throws Exception {
+ handleDataChange(dataPath, null, Watcher.Event.EventType.NodeDeleted);
+ }
+
+ @Override
+ public void handleDataChange(String dataPath, Object data, Watcher.Event.EventType eventType) throws Exception {
+ _listener.handleDataChange(dataPath, data, convertType(eventType));
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ DataListenerConverter that = (DataListenerConverter) o;
+ return _listener.equals(that._listener);
+ }
+
+ @Override
+ public int hashCode() {
+ return _listener.hashCode();
+ }
+ }
}
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/ZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/ZkClient.java
index 7e5c80a7e..bd76595fa 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/ZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/ZkClient.java
@@ -85,13 +85,23 @@ public class ZkClient extends org.apache.helix.zookeeper.zkclient.ZkClient imple
* The JMX bean name will be: HelixZkClient.monitorType.monitorKey.monitorInstanceName.
* @param monitorRootPathOnly
* Should only stat of access to root path be reported to JMX bean or path-specific stat be reported too.
+ * @param connectOnInit true if connect to ZK during initialization, otherwise user will need to call connect
+ * explicitly before talking to ZK.
*/
public ZkClient(IZkConnection zkConnection, int connectionTimeout, long operationRetryTimeout,
PathBasedZkSerializer zkSerializer,
String monitorType, String monitorKey, String monitorInstanceName,
- boolean monitorRootPathOnly) {
+ boolean monitorRootPathOnly, boolean connectOnInit) {
super(zkConnection, connectionTimeout, operationRetryTimeout, zkSerializer, monitorType,
- monitorKey, monitorInstanceName, monitorRootPathOnly);
+ monitorKey, monitorInstanceName, monitorRootPathOnly, connectOnInit);
+ }
+
+ public ZkClient(IZkConnection zkConnection, int connectionTimeout, long operationRetryTimeout,
+ PathBasedZkSerializer zkSerializer,
+ String monitorType, String monitorKey, String monitorInstanceName,
+ boolean monitorRootPathOnly) {
+ this(zkConnection, connectionTimeout, operationRetryTimeout, zkSerializer, monitorType, monitorKey,
+ monitorInstanceName, monitorRootPathOnly, true);
}
public ZkClient(IZkConnection connection, int connectionTimeout,
@@ -187,6 +197,16 @@ public class ZkClient extends org.apache.helix.zookeeper.zkclient.ZkClient imple
String _monitorKey;
String _monitorInstanceName = null;
boolean _monitorRootPathOnly = true;
+ boolean _connectOnInit = true;
+
+ /**
+ * If set true, the client will connect to ZK during initialization.
+ * Otherwise, user has to call connect() method explicitly before talking to ZK.
+ */
+ public Builder setConnectOnInit(boolean connectOnInit) {
+ _connectOnInit = connectOnInit;
+ return this;
+ }
public Builder setConnection(IZkConnection connection) {
this._connection = connection;
@@ -271,7 +291,7 @@ public class ZkClient extends org.apache.helix.zookeeper.zkclient.ZkClient imple
}
return new ZkClient(_connection, _connectionTimeout, _operationRetryTimeout, _zkSerializer,
- _monitorType, _monitorKey, _monitorInstanceName, _monitorRootPathOnly);
+ _monitorType, _monitorKey, _monitorInstanceName, _monitorRootPathOnly, _connectOnInit);
}
}
}
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/IZkDataListener.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/IZkDataListener.java
index 6d90e8de8..0a3e8b3e5 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/IZkDataListener.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/IZkDataListener.java
@@ -19,6 +19,9 @@ package org.apache.helix.zookeeper.zkclient;
* under the License.
*/
+import org.apache.zookeeper.Watcher;
+
+
/**
* An {@link IZkDataListener} can be registered at a {@link ZkClient} for listening on zk data changes for a given path.
*
@@ -31,4 +34,8 @@ public interface IZkDataListener {
public void handleDataChange(String dataPath, Object data) throws Exception;
public void handleDataDeleted(String dataPath) throws Exception;
+
+ default void handleDataChange(String dataPath, Object data, Watcher.Event.EventType eventType) throws Exception {
+ handleDataChange(dataPath, data);
+ }
}
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
index 527e46f90..4fca90da9 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
@@ -214,7 +214,7 @@ public class ZkClient implements Watcher {
protected ZkClient(IZkConnection zkConnection, int connectionTimeout, long operationRetryTimeout,
PathBasedZkSerializer zkSerializer, String monitorType, String monitorKey,
- String monitorInstanceName, boolean monitorRootPathOnly) {
+ String monitorInstanceName, boolean monitorRootPathOnly, boolean connectOnInit) {
if (zkConnection == null) {
throw new NullPointerException("Zookeeper connection is null!");
}
@@ -240,17 +240,18 @@ public class ZkClient implements Watcher {
LOG.info("ZkClient monitor key or type is not provided. Skip monitoring.");
}
- connect(connectionTimeout, this);
-
- try {
- if (_monitor != null) {
- _monitor.register();
- }
- } catch (JMException e){
- LOG.error("Error in creating ZkClientMonitor", e);
+ if (connectOnInit) {
+ connect(connectionTimeout, this);
}
}
+ protected ZkClient(IZkConnection zkConnection, int connectionTimeout, long operationRetryTimeout,
+ PathBasedZkSerializer zkSerializer, String monitorType, String monitorKey,
+ String monitorInstanceName, boolean monitorRootPathOnly) {
+ this(zkConnection, connectionTimeout, operationRetryTimeout, zkSerializer, monitorType, monitorKey,
+ monitorInstanceName, monitorRootPathOnly, true);
+ }
+
public List<String> subscribeChildChanges(String path, IZkChildListener listener) {
ChildrenSubscribeResult result = subscribeChildChanges(path, listener, false);
return result.getChildren();
@@ -1312,13 +1313,13 @@ public class ZkClient implements Watcher {
}
}
- private void fireAllEvents() {
+ private void fireAllEvents(WatchedEvent event) {
//TODO: During handling new session, if the path is deleted, watcher leakage could still happen
for (Entry<String, Set<IZkChildListener>> entry : _childListener.entrySet()) {
fireChildChangedEvents(entry.getKey(), entry.getValue(), true);
}
for (Entry<String, Set<IZkDataListenerEntry>> entry : _dataListener.entrySet()) {
- fireDataChangedEvents(entry.getKey(), entry.getValue(), OptionalLong.empty(), true);
+ fireDataChangedEvents(entry.getKey(), entry.getValue(), OptionalLong.empty(), true, event.getType());
}
}
@@ -1518,7 +1519,7 @@ public class ZkClient implements Watcher {
* reconnecting when the session expired. Because previous session expired, we also have to
* notify all listeners that something might have changed.
*/
- fireAllEvents();
+ fireAllEvents(event);
}
} else if (event.getState() == KeeperState.Expired) {
_isNewSessionEventFired = false;
@@ -1766,13 +1767,13 @@ public class ZkClient implements Watcher {
Set<IZkDataListenerEntry> listeners = _dataListener.get(path);
if (listeners != null && !listeners.isEmpty()) {
fireDataChangedEvents(event.getPath(), listeners, OptionalLong.of(notificationTime),
- pathExists);
+ pathExists, event.getType());
}
}
}
private void fireDataChangedEvents(final String path, Set<IZkDataListenerEntry> listeners,
- final OptionalLong notificationTime, boolean pathExists) {
+ final OptionalLong notificationTime, boolean pathExists, EventType eventType) {
try {
final ZkPathStatRecord pathStatRecord = new ZkPathStatRecord(path);
// Trigger listener callbacks
@@ -1815,7 +1816,7 @@ public class ZkClient implements Watcher {
return;
}
}
- listener.getDataListener().handleDataChange(path, data);
+ listener.getDataListener().handleDataChange(path, data, eventType);
}
}
});
@@ -2488,6 +2489,11 @@ public class ZkClient implements Watcher {
});
}
+ protected void connect(final long maxMsToWaitUntilConnected)
+ throws ZkInterruptedException, ZkTimeoutException, IllegalStateException {
+ connect(maxMsToWaitUntilConnected, this);
+ }
+
/**
* Connect to ZooKeeper.
* @param maxMsToWaitUntilConnected
@@ -2553,6 +2559,13 @@ public class ZkClient implements Watcher {
close();
}
}
+ try {
+ if (_monitor != null) {
+ _monitor.register();
+ }
+ } catch (JMException e){
+ LOG.error("Error in creating ZkClientMonitor", e);
+ }
}
public long getCreationTime(String path) {
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/metric/ZkClientMonitor.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/metric/ZkClientMonitor.java
index d0a37bb6e..aad5eb76d 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/metric/ZkClientMonitor.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/metric/ZkClientMonitor.java
@@ -55,6 +55,7 @@ public class ZkClientMonitor extends DynamicMBeanProvider {
private String _monitorKey;
private String _monitorInstanceName;
private boolean _monitorRootOnly;
+ private volatile boolean _registered = false;
private SimpleDynamicMetric<Long> _stateChangeEventCounter;
private SimpleDynamicMetric<Long> _expiredSessionCounter;
@@ -123,6 +124,9 @@ public class ZkClientMonitor extends DynamicMBeanProvider {
@Override
public DynamicMBeanProvider register() throws JMException {
+ if (_registered) {
+ return this;
+ }
List<DynamicMetric<?, ?>> attributeList = new ArrayList<>();
attributeList.add(_dataChangeEventCounter);
attributeList.add(_outstandingRequestGauge);
@@ -143,6 +147,7 @@ public class ZkClientMonitor extends DynamicMBeanProvider {
}
}
});
+ _registered = true;
return this;
}
@@ -154,6 +159,7 @@ public class ZkClientMonitor extends DynamicMBeanProvider {
for (ZkClientPathMonitor zkClientPathMonitor : _zkClientPathMonitorMap.values()) {
zkClientPathMonitor.unregister();
}
+ _registered = false;
}
@Override