You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by xy...@apache.org on 2023/01/23 23:44:49 UTC
[helix] branch metaclient updated: Create adapter package for data and child change listener and prepare zkclient (#2346)
This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch metaclient
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/metaclient by this push:
new 8d7063936 Create adapter package for data and child change listener and prepare zkclient (#2346)
8d7063936 is described below
commit 8d706393605b6d7796ba74bbac8f9a45cd2d7e4c
Author: Qi (Quincy) Qu <qq...@linkedin.com>
AuthorDate: Mon Jan 23 18:44:44 2023 -0500
Create adapter package for data and child change listener and prepare zkclient (#2346)
Prepare zkclient and implement new adapter for child change listener.
---
.../helix/metaclient/impl/zk/ZkMetaClient.java | 59 +----------------
.../impl/zk/adapter/ChildListenerAdapter.java | 75 +++++++++++++++++++++
.../impl/zk/adapter/DataListenerAdapter.java | 77 ++++++++++++++++++++++
.../helix/zookeeper/zkclient/IZkChildListener.java | 15 +++++
.../apache/helix/zookeeper/zkclient/ZkClient.java | 9 +--
5 files changed, 175 insertions(+), 60 deletions(-)
diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
index 00ad18f73..631bd1a3a 100644
--- a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
@@ -38,9 +38,9 @@ import org.apache.helix.metaclient.constants.MetaClientException;
import org.apache.helix.metaclient.constants.MetaClientInterruptException;
import org.apache.helix.metaclient.constants.MetaClientNoNodeException;
import org.apache.helix.metaclient.constants.MetaClientTimeoutException;
+import org.apache.helix.metaclient.impl.zk.adapter.DataListenerAdapter;
import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
import org.apache.helix.zookeeper.impl.client.ZkClient;
-import org.apache.helix.zookeeper.zkclient.IZkDataListener;
import org.apache.helix.zookeeper.zkclient.ZkConnection;
import org.apache.helix.zookeeper.zkclient.exception.ZkBadVersionException;
import org.apache.helix.zookeeper.zkclient.exception.ZkException;
@@ -49,7 +49,6 @@ import org.apache.helix.zookeeper.zkclient.exception.ZkNodeExistsException;
import org.apache.helix.zookeeper.zkclient.exception.ZkTimeoutException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.server.EphemeralType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -261,7 +260,7 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable {
if (!persistListener) {
throw new NotImplementedException("Currently the non-persist (one-time) listener is not supported in ZkMetaClient.");
}
- _zkClient.subscribeDataChanges(key, new DataListenerConverter(listener));
+ _zkClient.subscribeDataChanges(key, new DataListenerAdapter(listener));
return false;
}
@@ -286,7 +285,7 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable {
@Override
public void unsubscribeDataChange(String key, DataChangeListener listener) {
- _zkClient.unsubscribeDataChanges(key, new DataListenerConverter(listener));
+ _zkClient.unsubscribeDataChanges(key, new DataListenerAdapter(listener));
}
@Override
@@ -339,58 +338,6 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable {
disconnect();
}
- /**
- * A converter class to transform {@link DataChangeListener} to {@link IZkDataListener}
- */
- static class DataListenerConverter implements IZkDataListener {
- private final DataChangeListener _listener;
-
- DataListenerConverter(DataChangeListener listener) {
- _listener = listener;
- }
-
- private DataChangeListener.ChangeType convertType(Watcher.Event.EventType eventType) {
- switch (eventType) {
- case NodeCreated: return DataChangeListener.ChangeType.ENTRY_CREATED;
- case NodeDataChanged: return DataChangeListener.ChangeType.ENTRY_UPDATE;
- case NodeDeleted: return DataChangeListener.ChangeType.ENTRY_DELETED;
- default: throw new IllegalArgumentException("EventType " + eventType + " is not supported.");
- }
- }
-
- @Override
- public void handleDataChange(String dataPath, Object data) throws Exception {
- throw new UnsupportedOperationException("handleDataChange(String dataPath, Object data) is not supported.");
- }
-
- @Override
- public void handleDataDeleted(String dataPath) throws Exception {
- handleDataChange(dataPath, null, Watcher.Event.EventType.NodeDeleted);
- }
-
- @Override
- public void handleDataChange(String dataPath, Object data, Watcher.Event.EventType eventType) throws Exception {
- _listener.handleDataChange(dataPath, data, convertType(eventType));
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- DataListenerConverter that = (DataListenerConverter) o;
- return _listener.equals(that._listener);
- }
-
- @Override
- public int hashCode() {
- return _listener.hashCode();
- }
- }
-
private static MetaClientException translateZkExceptionToMetaclientException(ZkException e) {
if (e instanceof ZkNodeExistsException) {
return new MetaClientNoNodeException(e);
diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/adapter/ChildListenerAdapter.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/adapter/ChildListenerAdapter.java
new file mode 100644
index 000000000..28385ff2a
--- /dev/null
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/adapter/ChildListenerAdapter.java
@@ -0,0 +1,75 @@
+package org.apache.helix.metaclient.impl.zk.adapter;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+import org.apache.helix.metaclient.api.ChildChangeListener;
+import org.apache.helix.zookeeper.zkclient.IZkChildListener;
+import org.apache.zookeeper.Watcher;
+
+
+/**
+ * A adapter class to transform {@link ChildChangeListener} to {@link IZkChildListener}.
+ */
+public class ChildListenerAdapter implements IZkChildListener {
+ private final ChildChangeListener _listener;
+
+ public ChildListenerAdapter(ChildChangeListener listener) {
+ _listener = listener;
+ }
+
+ @Override
+ public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
+ throw new UnsupportedOperationException("handleChildChange(String parentPath, List<String> currentChilds) "
+ + "is not supported");
+ }
+
+ @Override
+ public void handleChildChange(String parentPath, List<String> currentChilds, Watcher.Event.EventType eventType)
+ throws Exception {
+ _listener.handleChildChange(parentPath, convertType(eventType));
+ }
+
+ private static ChildChangeListener.ChangeType convertType(Watcher.Event.EventType eventType) {
+ switch (eventType) {
+ case NodeCreated: return ChildChangeListener.ChangeType.ENTRY_CREATED;
+ case NodeChildrenChanged: return ChildChangeListener.ChangeType.ENTRY_DATA_CHANGE;
+ case NodeDeleted: return ChildChangeListener.ChangeType.ENTRY_DELETED;
+ default: throw new IllegalArgumentException("EventType " + eventType + " is not supported.");
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ChildListenerAdapter that = (ChildListenerAdapter) o;
+ return _listener.equals(that._listener);
+ }
+
+ @Override
+ public int hashCode() {
+ return _listener.hashCode();
+ }
+}
diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/adapter/DataListenerAdapter.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/adapter/DataListenerAdapter.java
new file mode 100644
index 000000000..94ae198ce
--- /dev/null
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/adapter/DataListenerAdapter.java
@@ -0,0 +1,77 @@
+package org.apache.helix.metaclient.impl.zk.adapter;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.metaclient.api.DataChangeListener;
+import org.apache.helix.zookeeper.zkclient.IZkDataListener;
+import org.apache.zookeeper.Watcher;
+
+
+/**
+ * A Adapter class to transform {@link DataChangeListener} to {@link IZkDataListener}
+ */
+public class DataListenerAdapter implements IZkDataListener {
+ private final DataChangeListener _listener;
+
+ public DataListenerAdapter(DataChangeListener listener) {
+ _listener = listener;
+ }
+
+ @Override
+ public void handleDataChange(String dataPath, Object data) throws Exception {
+ throw new UnsupportedOperationException("handleDataChange(String dataPath, Object data) is not supported.");
+ }
+
+ @Override
+ public void handleDataDeleted(String dataPath) throws Exception {
+ handleDataChange(dataPath, null, Watcher.Event.EventType.NodeDeleted);
+ }
+
+ @Override
+ public void handleDataChange(String dataPath, Object data, Watcher.Event.EventType eventType) throws Exception {
+ _listener.handleDataChange(dataPath, data, convertType(eventType));
+ }
+
+ private static DataChangeListener.ChangeType convertType(Watcher.Event.EventType eventType) {
+ switch (eventType) {
+ case NodeCreated: return DataChangeListener.ChangeType.ENTRY_CREATED;
+ case NodeDataChanged: return DataChangeListener.ChangeType.ENTRY_UPDATE;
+ case NodeDeleted: return DataChangeListener.ChangeType.ENTRY_DELETED;
+ default: throw new IllegalArgumentException("EventType " + eventType + " is not supported.");
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ DataListenerAdapter that = (DataListenerAdapter) o;
+ return _listener.equals(that._listener);
+ }
+
+ @Override
+ public int hashCode() {
+ return _listener.hashCode();
+ }
+}
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/IZkChildListener.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/IZkChildListener.java
index 7623a2ec1..a150f1643 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/IZkChildListener.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/IZkChildListener.java
@@ -20,6 +20,8 @@ package org.apache.helix.zookeeper.zkclient;
*/
import java.util.List;
+import org.apache.zookeeper.Watcher;
+
/**
* An {@link IZkChildListener} can be registered at a {@link ZkClient} for listening on zk child changes for a given
@@ -42,4 +44,17 @@ public interface IZkChildListener {
* @throws Exception
*/
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception;
+
+ /**
+ * Called when the children of the given path changed.
+ *
+ * @param parentPath The parent path
+ * @param currentChilds The children or null if the root node (parent path) was deleted.
+ * @param eventType The zookeeper event type
+ * @throws Exception
+ */
+ default void handleChildChange(String parentPath, List<String> currentChilds, Watcher.Event.EventType eventType)
+ throws Exception {
+ handleChildChange(parentPath, currentChilds);
+ }
}
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
index 747b7e7a0..1c00f9768 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
@@ -1316,7 +1316,7 @@ public class ZkClient implements Watcher {
private void fireAllEvents(WatchedEvent event) {
//TODO: During handling new session, if the path is deleted, watcher leakage could still happen
for (Entry<String, Set<IZkChildListener>> entry : _childListener.entrySet()) {
- fireChildChangedEvents(entry.getKey(), entry.getValue(), true);
+ fireChildChangedEvents(entry.getKey(), entry.getValue(), true, event.getType());
}
for (Entry<String, Set<IZkDataListenerEntry>> entry : _dataListener.entrySet()) {
fireDataChangedEvents(entry.getKey(), entry.getValue(), OptionalLong.empty(), true, event.getType());
@@ -1758,7 +1758,7 @@ public class ZkClient implements Watcher {
if (childListeners != null && !childListeners.isEmpty()) {
// TODO recording child changed event propagation latency as well. Note this change will
// introduce additional ZK access.
- fireChildChangedEvents(path, childListeners, pathExists);
+ fireChildChangedEvents(path, childListeners, pathExists, event.getType());
}
}
@@ -1826,7 +1826,8 @@ public class ZkClient implements Watcher {
}
}
- private void fireChildChangedEvents(final String path, Set<IZkChildListener> childListeners, boolean pathExists) {
+ private void fireChildChangedEvents(final String path, Set<IZkChildListener> childListeners, boolean pathExists,
+ EventType eventType) {
try {
final ZkPathStatRecord pathStatRecord = new ZkPathStatRecord(path);
for (final IZkChildListener listener : childListeners) {
@@ -1853,7 +1854,7 @@ public class ZkClient implements Watcher {
// Continue trigger the change handler
}
}
- listener.handleChildChange(path, children);
+ listener.handleChildChange(path, children, eventType);
}
});
}