You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by xy...@apache.org on 2023/04/27 20:36:18 UTC

[helix] 09/37: Implement data change listener for ZkMetaClient and test

This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch metaclient
in repository https://gitbox.apache.org/repos/asf/helix.git

commit db68826154a996177487f8433d99e6d082049aba
Author: Qi (Quincy) Qu <qq...@linkedin.com>
AuthorDate: Thu Jan 19 17:15:37 2023 -0500

    Implement data change listener for ZkMetaClient and test
    
    * Implement data change listener for ZkMetaClient and test
---
 .../helix/metaclient/impl/zk/ZkMetaClient.java     |  11 +-
 .../helix/metaclient/impl/zk/TestZkMetaClient.java | 119 ++++++++++++++++++++-
 .../apache/helix/zookeeper/zkclient/ZkClient.java  |   2 +-
 3 files changed, 128 insertions(+), 4 deletions(-)

diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
index 2fada4b28..00ad18f73 100644
--- a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
@@ -22,6 +22,7 @@ package org.apache.helix.metaclient.impl.zk;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.lang3.NotImplementedException;
 import org.apache.helix.metaclient.api.AsyncCallback;
 import org.apache.helix.metaclient.api.ChildChangeListener;
 import org.apache.helix.metaclient.api.ConnectStateChangeListener;
@@ -50,10 +51,12 @@ import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.server.EphemeralType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable {
-
+  private static final Logger LOG  = LoggerFactory.getLogger(ZkMetaClient.class);
   private final ZkClient _zkClient;
   private final int _connectionTimeout;
 
@@ -255,6 +258,10 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable {
   @Override
   public boolean subscribeDataChange(String key, DataChangeListener listener,
       boolean skipWatchingNonExistNode, boolean persistListener) {
+    if (!persistListener) {
+      throw new NotImplementedException("Currently the non-persist (one-time) listener is not supported in ZkMetaClient.");
+    }
+    _zkClient.subscribeDataChanges(key, new DataListenerConverter(listener));
     return false;
   }
 
@@ -279,7 +286,7 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable {
 
   @Override
   public void unsubscribeDataChange(String key, DataChangeListener listener) {
-
+    _zkClient.unsubscribeDataChanges(key, new DataListenerConverter(listener));
   }
 
   @Override
diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
index d822c2fa8..553ce22e0 100644
--- a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
+++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
@@ -30,6 +30,14 @@ import org.apache.commons.io.FileUtils;
 import org.apache.helix.metaclient.api.DataUpdater;
 import org.apache.helix.metaclient.api.MetaClientInterface;
 import org.apache.helix.metaclient.constants.MetaClientException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.helix.metaclient.api.DataChangeListener;
 import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
 import org.apache.helix.zookeeper.zkclient.IDefaultNameSpace;
 import org.apache.helix.zookeeper.zkclient.ZkServer;
@@ -44,8 +52,11 @@ import static org.apache.helix.metaclient.api.MetaClientInterface.EntryMode.PERS
 public class TestZkMetaClient {
 
   private static final String ZK_ADDR = "localhost:2183";
-  private ZkServer _zkServer;
+  private static final int DEFAULT_TIMEOUT_MS = 1000;
   private static final String ENTRY_STRING_VALUE = "test-value";
+  private final Object _syncObject = new Object();
+
+  private ZkServer _zkServer;
 
   @BeforeClass
   public void prepare() {
@@ -196,7 +207,91 @@ public class TestZkMetaClient {
     }
   }
 
+  @Test
+  public void testDataChangeListenerTriggerWithZkWatcher() throws Exception {
+    final String path = "/TestZkMetaClient_testTriggerWithZkWatcher";
+    try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) {
+      zkMetaClient.connect();
+      MockDataChangeListener listener = new MockDataChangeListener();
+      zkMetaClient.subscribeDataChange(path, listener, false, true);
+      zkMetaClient.create(path, "test-node");
+      int expectedCallCount = 0;
+      synchronized (_syncObject) {
+        while (listener.getTriggeredCount() == expectedCallCount) {
+          _syncObject.wait(DEFAULT_TIMEOUT_MS);
+        }
+        expectedCallCount++;
+        Assert.assertEquals(listener.getTriggeredCount(), expectedCallCount);
+        Assert.assertEquals(listener.getLastEventType(), DataChangeListener.ChangeType.ENTRY_CREATED);
+      }
+      zkMetaClient.set(path, "test-node-changed", -1);
+      synchronized (_syncObject) {
+        while (listener.getTriggeredCount() == expectedCallCount) {
+          _syncObject.wait(DEFAULT_TIMEOUT_MS);
+        }
+        expectedCallCount++;
+        Assert.assertEquals(listener.getTriggeredCount(), expectedCallCount);
+        Assert.assertEquals(listener.getLastEventType(), DataChangeListener.ChangeType.ENTRY_UPDATE);
+      }
+      zkMetaClient.delete(path);
+      synchronized (_syncObject) {
+        while (listener.getTriggeredCount() == expectedCallCount) {
+          _syncObject.wait(DEFAULT_TIMEOUT_MS);
+        }
+        expectedCallCount++;
+        Assert.assertEquals(listener.getTriggeredCount(), expectedCallCount);
+        Assert.assertEquals(listener.getLastEventType(), DataChangeListener.ChangeType.ENTRY_DELETED);
+      }
+      // unregister listener, expect no more call
+      zkMetaClient.unsubscribeDataChange(path, listener);
+      zkMetaClient.create(path, "test-node");
+      synchronized (_syncObject) {
+        _syncObject.wait(DEFAULT_TIMEOUT_MS);
+        Assert.assertEquals(listener.getTriggeredCount(), expectedCallCount);
+      }
+      // register a new non-persistent listener
+      try {
+        zkMetaClient.subscribeDataChange(path, new MockDataChangeListener(), false, false);
+        Assert.fail("One-time listener is not supported, NotImplementedException should be thrown.");
+      } catch (NotImplementedException e) {
+        // expected
+      }
+    }
+  }
 
+  @Test(dependsOnMethods = "testDataChangeListenerTriggerWithZkWatcher")
+  public void testMultipleDataChangeListeners() throws Exception {
+    final String basePath = "/TestZkMetaClient_testMultipleDataChangeListeners";
+    final int count = 5;
+    final String testData = "test-data";
+    final AtomicBoolean dataExpected = new AtomicBoolean(true);
+    try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) {
+      zkMetaClient.connect();
+      Map<String, Set<DataChangeListener>> listeners = new HashMap<>();
+      CountDownLatch countDownLatch = new CountDownLatch(count);
+      zkMetaClient.create(basePath + "_1", testData);
+      // create paths
+      for (int i = 0; i < 2; i++) {
+        String path = basePath + "_" + i;
+        listeners.put(path, new HashSet<>());
+        // 5 listeners for each path
+        for (int j = 0; j < count; j++) {
+          DataChangeListener listener = new DataChangeListener() {
+            @Override
+            public void handleDataChange(String key, Object data, ChangeType changeType) {
+              countDownLatch.countDown();
+              dataExpected.set(dataExpected.get() && testData.equals(data));
+            }
+          };
+          listeners.get(path).add(listener);
+          zkMetaClient.subscribeDataChange(path, listener, false, true);
+        }
+      }
+      zkMetaClient.set(basePath + "_1", testData, -1);
+      Assert.assertTrue(countDownLatch.await(5000, TimeUnit.MILLISECONDS));
+      Assert.assertTrue(dataExpected.get());
+    }
+  }
 
   private static ZkMetaClient<String> createZkMetaClient() {
     ZkMetaClientConfig config =
@@ -226,4 +321,26 @@ public class TestZkMetaClient {
     zkServer.start();
     return zkServer;
   }
+
+  private class MockDataChangeListener implements DataChangeListener {
+    private final AtomicInteger _triggeredCount = new AtomicInteger(0);
+    private volatile ChangeType _lastEventType;
+
+    @Override
+    public void handleDataChange(String key, Object data, ChangeType changeType) {
+      _triggeredCount.getAndIncrement();
+      _lastEventType = changeType;
+      synchronized (_syncObject) {
+        _syncObject.notifyAll();
+      }
+    }
+
+    int getTriggeredCount() {
+      return _triggeredCount.get();
+    }
+
+    ChangeType getLastEventType() {
+      return _lastEventType;
+    }
+  }
 }
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
index 4fca90da9..747b7e7a0 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
@@ -2489,7 +2489,7 @@ public class ZkClient implements Watcher {
     });
   }
 
-  protected void connect(final long maxMsToWaitUntilConnected)
+  public void connect(final long maxMsToWaitUntilConnected)
       throws ZkInterruptedException, ZkTimeoutException, IllegalStateException {
     connect(maxMsToWaitUntilConnected, this);
   }