You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by xy...@apache.org on 2023/01/23 23:44:49 UTC

[helix] branch metaclient updated: Create adapter package for data and child change listener and prepare zkclient (#2346)

This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch metaclient
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/metaclient by this push:
     new 8d7063936 Create adapter package for data and child change listener and prepare zkclient (#2346)
8d7063936 is described below

commit 8d706393605b6d7796ba74bbac8f9a45cd2d7e4c
Author: Qi (Quincy) Qu <qq...@linkedin.com>
AuthorDate: Mon Jan 23 18:44:44 2023 -0500

    Create adapter package for data and child change listener and prepare zkclient (#2346)
    
    Prepare zkclient and implement new adapter for child change listener.
---
 .../helix/metaclient/impl/zk/ZkMetaClient.java     | 59 +----------------
 .../impl/zk/adapter/ChildListenerAdapter.java      | 75 +++++++++++++++++++++
 .../impl/zk/adapter/DataListenerAdapter.java       | 77 ++++++++++++++++++++++
 .../helix/zookeeper/zkclient/IZkChildListener.java | 15 +++++
 .../apache/helix/zookeeper/zkclient/ZkClient.java  |  9 +--
 5 files changed, 175 insertions(+), 60 deletions(-)

diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
index 00ad18f73..631bd1a3a 100644
--- a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
@@ -38,9 +38,9 @@ import org.apache.helix.metaclient.constants.MetaClientException;
 import org.apache.helix.metaclient.constants.MetaClientInterruptException;
 import org.apache.helix.metaclient.constants.MetaClientNoNodeException;
 import org.apache.helix.metaclient.constants.MetaClientTimeoutException;
+import org.apache.helix.metaclient.impl.zk.adapter.DataListenerAdapter;
 import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
 import org.apache.helix.zookeeper.impl.client.ZkClient;
-import org.apache.helix.zookeeper.zkclient.IZkDataListener;
 import org.apache.helix.zookeeper.zkclient.ZkConnection;
 import org.apache.helix.zookeeper.zkclient.exception.ZkBadVersionException;
 import org.apache.helix.zookeeper.zkclient.exception.ZkException;
@@ -49,7 +49,6 @@ import org.apache.helix.zookeeper.zkclient.exception.ZkNodeExistsException;
 import org.apache.helix.zookeeper.zkclient.exception.ZkTimeoutException;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.server.EphemeralType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -261,7 +260,7 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable {
     if (!persistListener) {
       throw new NotImplementedException("Currently the non-persist (one-time) listener is not supported in ZkMetaClient.");
     }
-    _zkClient.subscribeDataChanges(key, new DataListenerConverter(listener));
+    _zkClient.subscribeDataChanges(key, new DataListenerAdapter(listener));
     return false;
   }
 
@@ -286,7 +285,7 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable {
 
   @Override
   public void unsubscribeDataChange(String key, DataChangeListener listener) {
-    _zkClient.unsubscribeDataChanges(key, new DataListenerConverter(listener));
+    _zkClient.unsubscribeDataChanges(key, new DataListenerAdapter(listener));
   }
 
   @Override
@@ -339,58 +338,6 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable {
     disconnect();
   }
 
-  /**
-   * A converter class to transform {@link DataChangeListener} to {@link IZkDataListener}
-   */
-  static class DataListenerConverter implements IZkDataListener {
-    private final DataChangeListener _listener;
-
-    DataListenerConverter(DataChangeListener listener) {
-      _listener = listener;
-    }
-
-    private DataChangeListener.ChangeType convertType(Watcher.Event.EventType eventType) {
-      switch (eventType) {
-        case NodeCreated: return DataChangeListener.ChangeType.ENTRY_CREATED;
-        case NodeDataChanged: return DataChangeListener.ChangeType.ENTRY_UPDATE;
-        case NodeDeleted: return DataChangeListener.ChangeType.ENTRY_DELETED;
-        default: throw new IllegalArgumentException("EventType " + eventType + " is not supported.");
-      }
-    }
-
-    @Override
-    public void handleDataChange(String dataPath, Object data) throws Exception {
-      throw new UnsupportedOperationException("handleDataChange(String dataPath, Object data) is not supported.");
-    }
-
-    @Override
-    public void handleDataDeleted(String dataPath) throws Exception {
-      handleDataChange(dataPath, null, Watcher.Event.EventType.NodeDeleted);
-    }
-
-    @Override
-    public void handleDataChange(String dataPath, Object data, Watcher.Event.EventType eventType) throws Exception {
-      _listener.handleDataChange(dataPath, data, convertType(eventType));
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-      DataListenerConverter that = (DataListenerConverter) o;
-      return _listener.equals(that._listener);
-    }
-
-    @Override
-    public int hashCode() {
-      return _listener.hashCode();
-    }
-  }
-
   private static MetaClientException translateZkExceptionToMetaclientException(ZkException e) {
     if (e instanceof ZkNodeExistsException) {
       return new MetaClientNoNodeException(e);
diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/adapter/ChildListenerAdapter.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/adapter/ChildListenerAdapter.java
new file mode 100644
index 000000000..28385ff2a
--- /dev/null
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/adapter/ChildListenerAdapter.java
@@ -0,0 +1,75 @@
+package org.apache.helix.metaclient.impl.zk.adapter;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+import org.apache.helix.metaclient.api.ChildChangeListener;
+import org.apache.helix.zookeeper.zkclient.IZkChildListener;
+import org.apache.zookeeper.Watcher;
+
+
+/**
+ * A adapter class to transform {@link ChildChangeListener} to {@link IZkChildListener}.
+ */
+public class ChildListenerAdapter implements IZkChildListener {
+  private final ChildChangeListener _listener;
+
+  public ChildListenerAdapter(ChildChangeListener listener) {
+    _listener = listener;
+  }
+
+  @Override
+  public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
+    throw new UnsupportedOperationException("handleChildChange(String parentPath, List<String> currentChilds) "
+        + "is not supported");
+  }
+
+  @Override
+  public void handleChildChange(String parentPath, List<String> currentChilds, Watcher.Event.EventType eventType)
+      throws Exception {
+    _listener.handleChildChange(parentPath, convertType(eventType));
+  }
+
+  private static ChildChangeListener.ChangeType convertType(Watcher.Event.EventType eventType) {
+    switch (eventType) {
+      case NodeCreated: return ChildChangeListener.ChangeType.ENTRY_CREATED;
+      case NodeChildrenChanged: return ChildChangeListener.ChangeType.ENTRY_DATA_CHANGE;
+      case NodeDeleted: return ChildChangeListener.ChangeType.ENTRY_DELETED;
+      default: throw new IllegalArgumentException("EventType " + eventType + " is not supported.");
+    }
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    ChildListenerAdapter that = (ChildListenerAdapter) o;
+    return _listener.equals(that._listener);
+  }
+
+  @Override
+  public int hashCode() {
+    return _listener.hashCode();
+  }
+}
diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/adapter/DataListenerAdapter.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/adapter/DataListenerAdapter.java
new file mode 100644
index 000000000..94ae198ce
--- /dev/null
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/adapter/DataListenerAdapter.java
@@ -0,0 +1,77 @@
+package org.apache.helix.metaclient.impl.zk.adapter;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.metaclient.api.DataChangeListener;
+import org.apache.helix.zookeeper.zkclient.IZkDataListener;
+import org.apache.zookeeper.Watcher;
+
+
+/**
+ * A Adapter class to transform {@link DataChangeListener} to {@link IZkDataListener}
+ */
+public class DataListenerAdapter implements IZkDataListener {
+  private final DataChangeListener _listener;
+
+  public DataListenerAdapter(DataChangeListener listener) {
+    _listener = listener;
+  }
+
+  @Override
+  public void handleDataChange(String dataPath, Object data) throws Exception {
+    throw new UnsupportedOperationException("handleDataChange(String dataPath, Object data) is not supported.");
+  }
+
+  @Override
+  public void handleDataDeleted(String dataPath) throws Exception {
+    handleDataChange(dataPath, null, Watcher.Event.EventType.NodeDeleted);
+  }
+
+  @Override
+  public void handleDataChange(String dataPath, Object data, Watcher.Event.EventType eventType) throws Exception {
+    _listener.handleDataChange(dataPath, data, convertType(eventType));
+  }
+
+  private static DataChangeListener.ChangeType convertType(Watcher.Event.EventType eventType) {
+    switch (eventType) {
+      case NodeCreated: return DataChangeListener.ChangeType.ENTRY_CREATED;
+      case NodeDataChanged: return DataChangeListener.ChangeType.ENTRY_UPDATE;
+      case NodeDeleted: return DataChangeListener.ChangeType.ENTRY_DELETED;
+      default: throw new IllegalArgumentException("EventType " + eventType + " is not supported.");
+    }
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    DataListenerAdapter that = (DataListenerAdapter) o;
+    return _listener.equals(that._listener);
+  }
+
+  @Override
+  public int hashCode() {
+    return _listener.hashCode();
+  }
+}
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/IZkChildListener.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/IZkChildListener.java
index 7623a2ec1..a150f1643 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/IZkChildListener.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/IZkChildListener.java
@@ -20,6 +20,8 @@ package org.apache.helix.zookeeper.zkclient;
  */
 
 import java.util.List;
+import org.apache.zookeeper.Watcher;
+
 
 /**
  * An {@link IZkChildListener} can be registered at a {@link ZkClient} for listening on zk child changes for a given
@@ -42,4 +44,17 @@ public interface IZkChildListener {
      * @throws Exception
      */
     public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception;
+
+    /**
+     * Called when the children of the given path changed.
+     *
+     * @param parentPath The parent path
+     * @param currentChilds The children or null if the root node (parent path) was deleted.
+     * @param eventType The zookeeper event type
+     * @throws Exception
+     */
+    default void handleChildChange(String parentPath, List<String> currentChilds, Watcher.Event.EventType eventType)
+        throws Exception {
+        handleChildChange(parentPath, currentChilds);
+    }
 }
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
index 747b7e7a0..1c00f9768 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
@@ -1316,7 +1316,7 @@ public class ZkClient implements Watcher {
   private void fireAllEvents(WatchedEvent event) {
     //TODO: During handling new session, if the path is deleted, watcher leakage could still happen
     for (Entry<String, Set<IZkChildListener>> entry : _childListener.entrySet()) {
-      fireChildChangedEvents(entry.getKey(), entry.getValue(), true);
+      fireChildChangedEvents(entry.getKey(), entry.getValue(), true, event.getType());
     }
     for (Entry<String, Set<IZkDataListenerEntry>> entry : _dataListener.entrySet()) {
       fireDataChangedEvents(entry.getKey(), entry.getValue(), OptionalLong.empty(), true, event.getType());
@@ -1758,7 +1758,7 @@ public class ZkClient implements Watcher {
       if (childListeners != null && !childListeners.isEmpty()) {
         // TODO recording child changed event propagation latency as well. Note this change will
         // introduce additional ZK access.
-        fireChildChangedEvents(path, childListeners, pathExists);
+        fireChildChangedEvents(path, childListeners, pathExists, event.getType());
       }
     }
 
@@ -1826,7 +1826,8 @@ public class ZkClient implements Watcher {
     }
   }
 
-  private void fireChildChangedEvents(final String path, Set<IZkChildListener> childListeners, boolean pathExists) {
+  private void fireChildChangedEvents(final String path, Set<IZkChildListener> childListeners, boolean pathExists,
+      EventType eventType) {
     try {
       final ZkPathStatRecord pathStatRecord = new ZkPathStatRecord(path);
       for (final IZkChildListener listener : childListeners) {
@@ -1853,7 +1854,7 @@ public class ZkClient implements Watcher {
                 // Continue trigger the change handler
               }
             }
-            listener.handleChildChange(path, children);
+            listener.handleChildChange(path, children, eventType);
           }
         });
       }