You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by xy...@apache.org on 2023/01/09 17:57:00 UTC

[helix] branch metaclient updated: New features and improvement in zookeeper-api to prepare meta-client implementation (#2333)

This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch metaclient
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/metaclient by this push:
     new 9bda33d4e New features and improvement in zookeeper-api to prepare meta-client implementation (#2333)
9bda33d4e is described below

commit 9bda33d4eeb5f1dac83b7b77a69a83c3ecfda0ed
Author: Qi (Quincy) Qu <qq...@linkedin.com>
AuthorDate: Mon Jan 9 12:56:54 2023 -0500

    New features and improvement in zookeeper-api to prepare meta-client implementation (#2333)
    
    Prepare zkclient for meta-client DataChangeListener add new method in IZkDataListener
    Implement boolean flag connectOnInit in zkclient for decoupling
    Override equals and hashcode for the converter class
---
 .github/workflows/Helix-PR-CI.yml                  |  2 +-
 .../helix/metaclient/impl/zk/ZkMetaClient.java     | 54 ++++++++++++++++++++++
 .../helix/zookeeper/impl/client/ZkClient.java      | 26 +++++++++--
 .../helix/zookeeper/zkclient/IZkDataListener.java  |  7 +++
 .../apache/helix/zookeeper/zkclient/ZkClient.java  | 43 +++++++++++------
 .../zookeeper/zkclient/metric/ZkClientMonitor.java |  6 +++
 6 files changed, 119 insertions(+), 19 deletions(-)

diff --git a/.github/workflows/Helix-PR-CI.yml b/.github/workflows/Helix-PR-CI.yml
index 7ad3c89fe..ddb56fe47 100644
--- a/.github/workflows/Helix-PR-CI.yml
+++ b/.github/workflows/Helix-PR-CI.yml
@@ -1,7 +1,7 @@
 name: Helix PR CI
 on:
   pull_request:
-    branches: [ master ]
+    branches: [ master, metaclient ] # TODO: remove side branch
     paths-ignore:
       - '.github/**'
       - 'helix-front/**'
diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
index 194cf18f9..b3270ad81 100644
--- a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
@@ -33,7 +33,9 @@ import org.apache.helix.metaclient.api.MetaClientInterface;
 import org.apache.helix.metaclient.api.OpResult;
 import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
 import org.apache.helix.zookeeper.impl.client.ZkClient;
+import org.apache.helix.zookeeper.zkclient.IZkDataListener;
 import org.apache.helix.zookeeper.zkclient.ZkConnection;
+import org.apache.zookeeper.Watcher;
 
 
 public class ZkMetaClient implements MetaClientInterface {
@@ -247,4 +249,56 @@ public class ZkMetaClient implements MetaClientInterface {
   public List<OpResult> transactionOP(Iterable iterable) {
     return null;
   }
+
+  /**
+   * A converter class to transform {@link DataChangeListener} to {@link IZkDataListener}
+   */
+  static class DataListenerConverter implements IZkDataListener {
+    private final DataChangeListener _listener;
+
+    DataListenerConverter(DataChangeListener listener) {
+      _listener = listener;
+    }
+
+    private DataChangeListener.ChangeType convertType(Watcher.Event.EventType eventType) {
+      switch (eventType) {
+        case NodeCreated: return DataChangeListener.ChangeType.ENTRY_CREATED;
+        case NodeDataChanged: return DataChangeListener.ChangeType.ENTRY_UPDATE;
+        case NodeDeleted: return DataChangeListener.ChangeType.ENTRY_DELETED;
+        default: throw new IllegalArgumentException("EventType " + eventType + " is not supported.");
+      }
+    }
+
+    @Override
+    public void handleDataChange(String dataPath, Object data) throws Exception {
+      throw new UnsupportedOperationException("handleDataChange(String dataPath, Object data) is not supported.");
+    }
+
+    @Override
+    public void handleDataDeleted(String dataPath) throws Exception {
+      handleDataChange(dataPath, null, Watcher.Event.EventType.NodeDeleted);
+    }
+
+    @Override
+    public void handleDataChange(String dataPath, Object data, Watcher.Event.EventType eventType) throws Exception {
+      _listener.handleDataChange(dataPath, data, convertType(eventType));
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      DataListenerConverter that = (DataListenerConverter) o;
+      return _listener.equals(that._listener);
+    }
+
+    @Override
+    public int hashCode() {
+      return _listener.hashCode();
+    }
+  }
 }
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/ZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/ZkClient.java
index 7e5c80a7e..bd76595fa 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/ZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/ZkClient.java
@@ -85,13 +85,23 @@ public class ZkClient extends org.apache.helix.zookeeper.zkclient.ZkClient imple
    *            The JMX bean name will be: HelixZkClient.monitorType.monitorKey.monitorInstanceName.
    * @param monitorRootPathOnly
    *            Should only stat of access to root path be reported to JMX bean or path-specific stat be reported too.
+   * @param connectOnInit true if connect to ZK during initialization, otherwise user will need to call connect
+   *                      explicitly before talking to ZK.
    */
   public ZkClient(IZkConnection zkConnection, int connectionTimeout, long operationRetryTimeout,
       PathBasedZkSerializer zkSerializer,
       String monitorType, String monitorKey, String monitorInstanceName,
-      boolean monitorRootPathOnly) {
+      boolean monitorRootPathOnly, boolean connectOnInit) {
     super(zkConnection, connectionTimeout, operationRetryTimeout, zkSerializer, monitorType,
-        monitorKey, monitorInstanceName, monitorRootPathOnly);
+        monitorKey, monitorInstanceName, monitorRootPathOnly, connectOnInit);
+  }
+
+  public ZkClient(IZkConnection zkConnection, int connectionTimeout, long operationRetryTimeout,
+      PathBasedZkSerializer zkSerializer,
+      String monitorType, String monitorKey, String monitorInstanceName,
+      boolean monitorRootPathOnly) {
+    this(zkConnection, connectionTimeout, operationRetryTimeout, zkSerializer, monitorType, monitorKey,
+        monitorInstanceName, monitorRootPathOnly, true);
   }
 
   public ZkClient(IZkConnection connection, int connectionTimeout,
@@ -187,6 +197,16 @@ public class ZkClient extends org.apache.helix.zookeeper.zkclient.ZkClient imple
     String _monitorKey;
     String _monitorInstanceName = null;
     boolean _monitorRootPathOnly = true;
+    boolean _connectOnInit = true;
+
+    /**
+     * If set true, the client will connect to ZK during initialization.
+     * Otherwise, user has to call connect() method explicitly before talking to ZK.
+     */
+    public Builder setConnectOnInit(boolean connectOnInit) {
+      _connectOnInit = connectOnInit;
+      return this;
+    }
 
     public Builder setConnection(IZkConnection connection) {
       this._connection = connection;
@@ -271,7 +291,7 @@ public class ZkClient extends org.apache.helix.zookeeper.zkclient.ZkClient imple
       }
 
       return new ZkClient(_connection, _connectionTimeout, _operationRetryTimeout, _zkSerializer,
-          _monitorType, _monitorKey, _monitorInstanceName, _monitorRootPathOnly);
+          _monitorType, _monitorKey, _monitorInstanceName, _monitorRootPathOnly, _connectOnInit);
     }
   }
 }
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/IZkDataListener.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/IZkDataListener.java
index 6d90e8de8..0a3e8b3e5 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/IZkDataListener.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/IZkDataListener.java
@@ -19,6 +19,9 @@ package org.apache.helix.zookeeper.zkclient;
  * under the License.
  */
 
+import org.apache.zookeeper.Watcher;
+
+
 /**
  * An {@link IZkDataListener} can be registered at a {@link ZkClient} for listening on zk data changes for a given path.
  *
@@ -31,4 +34,8 @@ public interface IZkDataListener {
     public void handleDataChange(String dataPath, Object data) throws Exception;
 
     public void handleDataDeleted(String dataPath) throws Exception;
+
+    default void handleDataChange(String dataPath, Object data, Watcher.Event.EventType eventType) throws Exception {
+        handleDataChange(dataPath, data);
+    }
 }
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
index 527e46f90..4fca90da9 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
@@ -214,7 +214,7 @@ public class ZkClient implements Watcher {
 
   protected ZkClient(IZkConnection zkConnection, int connectionTimeout, long operationRetryTimeout,
       PathBasedZkSerializer zkSerializer, String monitorType, String monitorKey,
-      String monitorInstanceName, boolean monitorRootPathOnly) {
+      String monitorInstanceName, boolean monitorRootPathOnly, boolean connectOnInit) {
     if (zkConnection == null) {
       throw new NullPointerException("Zookeeper connection is null!");
     }
@@ -240,17 +240,18 @@ public class ZkClient implements Watcher {
       LOG.info("ZkClient monitor key or type is not provided. Skip monitoring.");
     }
 
-    connect(connectionTimeout, this);
-
-    try {
-      if (_monitor != null) {
-        _monitor.register();
-      }
-    } catch (JMException e){
-      LOG.error("Error in creating ZkClientMonitor", e);
+    if (connectOnInit) {
+      connect(connectionTimeout, this);
     }
   }
 
+  protected ZkClient(IZkConnection zkConnection, int connectionTimeout, long operationRetryTimeout,
+      PathBasedZkSerializer zkSerializer, String monitorType, String monitorKey,
+      String monitorInstanceName, boolean monitorRootPathOnly) {
+    this(zkConnection, connectionTimeout, operationRetryTimeout, zkSerializer, monitorType, monitorKey,
+        monitorInstanceName, monitorRootPathOnly, true);
+  }
+
   public List<String> subscribeChildChanges(String path, IZkChildListener listener) {
     ChildrenSubscribeResult result = subscribeChildChanges(path, listener, false);
     return result.getChildren();
@@ -1312,13 +1313,13 @@ public class ZkClient implements Watcher {
     }
   }
 
-  private void fireAllEvents() {
+  private void fireAllEvents(WatchedEvent event) {
     //TODO: During handling new session, if the path is deleted, watcher leakage could still happen
     for (Entry<String, Set<IZkChildListener>> entry : _childListener.entrySet()) {
       fireChildChangedEvents(entry.getKey(), entry.getValue(), true);
     }
     for (Entry<String, Set<IZkDataListenerEntry>> entry : _dataListener.entrySet()) {
-      fireDataChangedEvents(entry.getKey(), entry.getValue(), OptionalLong.empty(), true);
+      fireDataChangedEvents(entry.getKey(), entry.getValue(), OptionalLong.empty(), true, event.getType());
     }
   }
 
@@ -1518,7 +1519,7 @@ public class ZkClient implements Watcher {
          * reconnecting when the session expired. Because previous session expired, we also have to
          * notify all listeners that something might have changed.
          */
-        fireAllEvents();
+        fireAllEvents(event);
       }
     } else if (event.getState() == KeeperState.Expired) {
       _isNewSessionEventFired = false;
@@ -1766,13 +1767,13 @@ public class ZkClient implements Watcher {
       Set<IZkDataListenerEntry> listeners = _dataListener.get(path);
       if (listeners != null && !listeners.isEmpty()) {
         fireDataChangedEvents(event.getPath(), listeners, OptionalLong.of(notificationTime),
-            pathExists);
+            pathExists, event.getType());
       }
     }
   }
 
   private void fireDataChangedEvents(final String path, Set<IZkDataListenerEntry> listeners,
-      final OptionalLong notificationTime, boolean pathExists) {
+      final OptionalLong notificationTime, boolean pathExists, EventType eventType) {
     try {
       final ZkPathStatRecord pathStatRecord = new ZkPathStatRecord(path);
       // Trigger listener callbacks
@@ -1815,7 +1816,7 @@ public class ZkClient implements Watcher {
                   return;
                 }
               }
-              listener.getDataListener().handleDataChange(path, data);
+              listener.getDataListener().handleDataChange(path, data, eventType);
             }
           }
         });
@@ -2488,6 +2489,11 @@ public class ZkClient implements Watcher {
     });
   }
 
+  protected void connect(final long maxMsToWaitUntilConnected)
+      throws ZkInterruptedException, ZkTimeoutException, IllegalStateException {
+    connect(maxMsToWaitUntilConnected, this);
+  }
+
   /**
    * Connect to ZooKeeper.
    * @param maxMsToWaitUntilConnected
@@ -2553,6 +2559,13 @@ public class ZkClient implements Watcher {
         close();
       }
     }
+    try {
+      if (_monitor != null) {
+        _monitor.register();
+      }
+    } catch (JMException e){
+      LOG.error("Error in creating ZkClientMonitor", e);
+    }
   }
 
   public long getCreationTime(String path) {
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/metric/ZkClientMonitor.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/metric/ZkClientMonitor.java
index d0a37bb6e..aad5eb76d 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/metric/ZkClientMonitor.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/metric/ZkClientMonitor.java
@@ -55,6 +55,7 @@ public class ZkClientMonitor extends DynamicMBeanProvider {
   private String _monitorKey;
   private String _monitorInstanceName;
   private boolean _monitorRootOnly;
+  private volatile boolean _registered = false;
 
   private SimpleDynamicMetric<Long> _stateChangeEventCounter;
   private SimpleDynamicMetric<Long> _expiredSessionCounter;
@@ -123,6 +124,9 @@ public class ZkClientMonitor extends DynamicMBeanProvider {
 
   @Override
   public DynamicMBeanProvider register() throws JMException {
+    if (_registered) {
+      return this;
+    }
     List<DynamicMetric<?, ?>> attributeList = new ArrayList<>();
     attributeList.add(_dataChangeEventCounter);
     attributeList.add(_outstandingRequestGauge);
@@ -143,6 +147,7 @@ public class ZkClientMonitor extends DynamicMBeanProvider {
         }
       }
     });
+    _registered = true;
     return this;
   }
 
@@ -154,6 +159,7 @@ public class ZkClientMonitor extends DynamicMBeanProvider {
     for (ZkClientPathMonitor zkClientPathMonitor : _zkClientPathMonitorMap.values()) {
       zkClientPathMonitor.unregister();
     }
+    _registered = false;
   }
 
   @Override