You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by xy...@apache.org on 2023/04/27 20:36:18 UTC
[helix] 09/37: Implement data change listener for ZkMetaClient and test
This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch metaclient
in repository https://gitbox.apache.org/repos/asf/helix.git
commit db68826154a996177487f8433d99e6d082049aba
Author: Qi (Quincy) Qu <qq...@linkedin.com>
AuthorDate: Thu Jan 19 17:15:37 2023 -0500
Implement data change listener for ZkMetaClient and test
* Implement data change listener for ZkMetaClient and test
---
.../helix/metaclient/impl/zk/ZkMetaClient.java | 11 +-
.../helix/metaclient/impl/zk/TestZkMetaClient.java | 119 ++++++++++++++++++++-
.../apache/helix/zookeeper/zkclient/ZkClient.java | 2 +-
3 files changed, 128 insertions(+), 4 deletions(-)
diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
index 2fada4b28..00ad18f73 100644
--- a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
@@ -22,6 +22,7 @@ package org.apache.helix.metaclient.impl.zk;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.NotImplementedException;
import org.apache.helix.metaclient.api.AsyncCallback;
import org.apache.helix.metaclient.api.ChildChangeListener;
import org.apache.helix.metaclient.api.ConnectStateChangeListener;
@@ -50,10 +51,12 @@ import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.server.EphemeralType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable {
-
+ private static final Logger LOG = LoggerFactory.getLogger(ZkMetaClient.class);
private final ZkClient _zkClient;
private final int _connectionTimeout;
@@ -255,6 +258,10 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable {
@Override
public boolean subscribeDataChange(String key, DataChangeListener listener,
boolean skipWatchingNonExistNode, boolean persistListener) {
+ if (!persistListener) {
+ throw new NotImplementedException("Currently the non-persist (one-time) listener is not supported in ZkMetaClient.");
+ }
+ _zkClient.subscribeDataChanges(key, new DataListenerConverter(listener));
return false;
}
@@ -279,7 +286,7 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable {
@Override
public void unsubscribeDataChange(String key, DataChangeListener listener) {
-
+ _zkClient.unsubscribeDataChanges(key, new DataListenerConverter(listener));
}
@Override
diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
index d822c2fa8..553ce22e0 100644
--- a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
+++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
@@ -30,6 +30,14 @@ import org.apache.commons.io.FileUtils;
import org.apache.helix.metaclient.api.DataUpdater;
import org.apache.helix.metaclient.api.MetaClientInterface;
import org.apache.helix.metaclient.constants.MetaClientException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.helix.metaclient.api.DataChangeListener;
import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
import org.apache.helix.zookeeper.zkclient.IDefaultNameSpace;
import org.apache.helix.zookeeper.zkclient.ZkServer;
@@ -44,8 +52,11 @@ import static org.apache.helix.metaclient.api.MetaClientInterface.EntryMode.PERS
public class TestZkMetaClient {
private static final String ZK_ADDR = "localhost:2183";
- private ZkServer _zkServer;
+ private static final int DEFAULT_TIMEOUT_MS = 1000;
private static final String ENTRY_STRING_VALUE = "test-value";
+ private final Object _syncObject = new Object();
+
+ private ZkServer _zkServer;
@BeforeClass
public void prepare() {
@@ -196,7 +207,91 @@ public class TestZkMetaClient {
}
}
+ @Test
+ public void testDataChangeListenerTriggerWithZkWatcher() throws Exception {
+ final String path = "/TestZkMetaClient_testTriggerWithZkWatcher";
+ try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) {
+ zkMetaClient.connect();
+ MockDataChangeListener listener = new MockDataChangeListener();
+ zkMetaClient.subscribeDataChange(path, listener, false, true);
+ zkMetaClient.create(path, "test-node");
+ int expectedCallCount = 0;
+ synchronized (_syncObject) {
+ while (listener.getTriggeredCount() == expectedCallCount) {
+ _syncObject.wait(DEFAULT_TIMEOUT_MS);
+ }
+ expectedCallCount++;
+ Assert.assertEquals(listener.getTriggeredCount(), expectedCallCount);
+ Assert.assertEquals(listener.getLastEventType(), DataChangeListener.ChangeType.ENTRY_CREATED);
+ }
+ zkMetaClient.set(path, "test-node-changed", -1);
+ synchronized (_syncObject) {
+ while (listener.getTriggeredCount() == expectedCallCount) {
+ _syncObject.wait(DEFAULT_TIMEOUT_MS);
+ }
+ expectedCallCount++;
+ Assert.assertEquals(listener.getTriggeredCount(), expectedCallCount);
+ Assert.assertEquals(listener.getLastEventType(), DataChangeListener.ChangeType.ENTRY_UPDATE);
+ }
+ zkMetaClient.delete(path);
+ synchronized (_syncObject) {
+ while (listener.getTriggeredCount() == expectedCallCount) {
+ _syncObject.wait(DEFAULT_TIMEOUT_MS);
+ }
+ expectedCallCount++;
+ Assert.assertEquals(listener.getTriggeredCount(), expectedCallCount);
+ Assert.assertEquals(listener.getLastEventType(), DataChangeListener.ChangeType.ENTRY_DELETED);
+ }
+ // unregister listener, expect no more call
+ zkMetaClient.unsubscribeDataChange(path, listener);
+ zkMetaClient.create(path, "test-node");
+ synchronized (_syncObject) {
+ _syncObject.wait(DEFAULT_TIMEOUT_MS);
+ Assert.assertEquals(listener.getTriggeredCount(), expectedCallCount);
+ }
+ // register a new non-persistent listener
+ try {
+ zkMetaClient.subscribeDataChange(path, new MockDataChangeListener(), false, false);
+ Assert.fail("One-time listener is not supported, NotImplementedException should be thrown.");
+ } catch (NotImplementedException e) {
+ // expected
+ }
+ }
+ }
+ @Test(dependsOnMethods = "testDataChangeListenerTriggerWithZkWatcher")
+ public void testMultipleDataChangeListeners() throws Exception {
+ final String basePath = "/TestZkMetaClient_testMultipleDataChangeListeners";
+ final int count = 5;
+ final String testData = "test-data";
+ final AtomicBoolean dataExpected = new AtomicBoolean(true);
+ try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) {
+ zkMetaClient.connect();
+ Map<String, Set<DataChangeListener>> listeners = new HashMap<>();
+ CountDownLatch countDownLatch = new CountDownLatch(count);
+ zkMetaClient.create(basePath + "_1", testData);
+ // create paths
+ for (int i = 0; i < 2; i++) {
+ String path = basePath + "_" + i;
+ listeners.put(path, new HashSet<>());
+ // 5 listeners for each path
+ for (int j = 0; j < count; j++) {
+ DataChangeListener listener = new DataChangeListener() {
+ @Override
+ public void handleDataChange(String key, Object data, ChangeType changeType) {
+ countDownLatch.countDown();
+ dataExpected.set(dataExpected.get() && testData.equals(data));
+ }
+ };
+ listeners.get(path).add(listener);
+ zkMetaClient.subscribeDataChange(path, listener, false, true);
+ }
+ }
+ zkMetaClient.set(basePath + "_1", testData, -1);
+ Assert.assertTrue(countDownLatch.await(5000, TimeUnit.MILLISECONDS));
+ Assert.assertTrue(dataExpected.get());
+ }
+ }
private static ZkMetaClient<String> createZkMetaClient() {
ZkMetaClientConfig config =
@@ -226,4 +321,26 @@ public class TestZkMetaClient {
zkServer.start();
return zkServer;
}
+
+ private class MockDataChangeListener implements DataChangeListener {
+ private final AtomicInteger _triggeredCount = new AtomicInteger(0);
+ private volatile ChangeType _lastEventType;
+
+ @Override
+ public void handleDataChange(String key, Object data, ChangeType changeType) {
+ _triggeredCount.getAndIncrement();
+ _lastEventType = changeType;
+ synchronized (_syncObject) {
+ _syncObject.notifyAll();
+ }
+ }
+
+ int getTriggeredCount() {
+ return _triggeredCount.get();
+ }
+
+ ChangeType getLastEventType() {
+ return _lastEventType;
+ }
+ }
}
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
index 4fca90da9..747b7e7a0 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
@@ -2489,7 +2489,7 @@ public class ZkClient implements Watcher {
});
}
- protected void connect(final long maxMsToWaitUntilConnected)
+ public void connect(final long maxMsToWaitUntilConnected)
throws ZkInterruptedException, ZkTimeoutException, IllegalStateException {
connect(maxMsToWaitUntilConnected, this);
}